http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java index d3f9eda..d014a2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -36,7 +36,7 @@ public abstract class FilterTemplate2 implements Filterer{ private TransferPair[] transfers; @Override - public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException{ + public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException { this.transfers = transfers; this.outgoingSelectionVector = outgoing.getSelectionVector2(); this.svMode = incoming.getSchema().getSelectionVectorMode(); @@ -60,7 +60,8 @@ public abstract class FilterTemplate2 implements Filterer{ } } - public void filterBatch(int recordCount){ + @Override + public void filterBatch(int recordCount) throws SchemaChangeException{ if (recordCount == 0) { return; } @@ -80,7 +81,7 @@ public abstract class FilterTemplate2 implements Filterer{ doTransfers(); } - private void filterBatchSV2(int recordCount){ + private void filterBatchSV2(int recordCount) throws SchemaChangeException { int svIndex = 0; final int count = recordCount; for(int i = 0; i < count; i++){ @@ -93,7 +94,7 @@ public abstract class FilterTemplate2 implements Filterer{ outgoingSelectionVector.setRecordCount(svIndex); } - private void filterBatchNoSV(int recordCount){ + private void filterBatchNoSV(int recordCount) throws SchemaChangeException { int svIndex = 0; for(int i = 0; i < recordCount; i++){ if(doEval(i, 0)){ @@ -104,7 +105,12 @@ public abstract class FilterTemplate2 implements Filterer{ outgoingSelectionVector.setRecordCount(svIndex); } - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); - public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract void doSetup(@Named("context") FragmentContext context, + @Named("incoming") RecordBatch incoming, + @Named("outgoing") RecordBatch outgoing) + throws SchemaChangeException; + public abstract boolean doEval(@Named("inIndex") int inIndex, + @Named("outIndex") int outIndex) + throws SchemaChangeException; }
http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java index fd7a13f..aa45f54 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,9 +27,8 @@ public interface Filterer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Filterer.class); public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException; - public void filterBatch(int recordCount); + public void filterBatch(int recordCount) throws SchemaChangeException; public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class); public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class); - } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index a2b170d..bedf731 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -125,6 +125,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { return this.container; } + @SuppressWarnings("resource") private void setFlattenVector() { final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); @@ -266,6 +267,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { * the end of one of the other vectors while we are copying the data of the other vectors alongside each new flattened * value coming out of the repeated field.) */ + @SuppressWarnings("resource") private TransferPair getFlattenFieldTransferPair(FieldReference reference) { final TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn()); final Class<?> vectorClass = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass(); @@ -301,6 +303,9 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { final List<TransferPair> transfers = Lists.newArrayList(); final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + cg.getCodeGenerator().plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.getCodeGenerator().saveCodeForDebugging(true); final IntHashSet transferFieldIds = new IntHashSet(); final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn())); @@ -349,6 +354,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { cg.addExpr(expr); } else{ // need to do evaluation. + @SuppressWarnings("resource") ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); allocationVectors.add(vector); TypedFieldId fid = container.add(vector); http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java index f40d924..dcef899 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenTemplate.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -58,19 +58,16 @@ public abstract class FlattenTemplate implements Flattener { // this allows for groups to be written between batches if we run out of space, for cases where we have finished // a batch on the boundary it will be set to 0 - private int innerValueIndex; + private int innerValueIndex = -1; private int currentInnerValueIndex; - public FlattenTemplate() throws SchemaChangeException { - innerValueIndex = -1; - } - @Override public void setFlattenField(RepeatedValueVector flattenField) { this.fieldToFlatten = flattenField; this.accessor = RepeatedValueVector.RepeatedAccessor.class.cast(flattenField.getAccessor()); } + @Override public RepeatedValueVector getFlattenField() { return fieldToFlatten; } @@ -188,6 +185,8 @@ public abstract class FlattenTemplate implements Flattener { * and reduce the size of the currently used vectors. */ break outer; + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); } outputIndex++; currentInnerValueIndexLocal++; @@ -295,6 +294,9 @@ public abstract class FlattenTemplate implements Flattener { this.currentInnerValueIndex = 0; } - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); - public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract void doSetup(@Named("context") FragmentContext context, + @Named("incoming") RecordBatch incoming, + @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; + public abstract boolean doEval(@Named("inIndex") int inIndex, + @Named("outIndex") int outIndex) throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 18cfc78..23741b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -402,6 +402,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException { final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + cg.plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.saveCodeForDebugging(true); final ClassGenerator<HashJoinProbe> g = cg.getRoot(); // Generate the code to project build side records http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 90f3f5f..a9bb479 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -266,6 +266,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{ final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + cg.getCodeGenerator().plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.getCodeGenerator().saveCodeForDebugging(true); final ErrorCollector collector = new ErrorCollectorImpl(); // Generate members and initialization code http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index 9b935e8..2e92c8d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -223,6 +223,9 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> */ private NestedLoopJoin setupWorker() throws IOException, ClassTransformationException { final CodeGenerator<NestedLoopJoin> nLJCodeGenerator = CodeGenerator.get(NestedLoopJoin.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + nLJCodeGenerator.plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// nLJCodeGenerator.saveCodeForDebugging(true); final ClassGenerator<NestedLoopJoin> nLJClassGenerator = nLJCodeGenerator.getRoot(); http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java index f2a95b8..090ca58 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,9 +32,9 @@ public interface MergingReceiverGeneratorBase { VectorAccessible outgoing) throws SchemaChangeException; public abstract int doEval(int leftIndex, - int rightIndex); + int rightIndex) throws SchemaChangeException; - public abstract void doCopy(int inIndex, int outIndex); + public abstract void doCopy(int inIndex, int outIndex) throws SchemaChangeException; public static TemplateClassDefinition<MergingReceiverGeneratorBase> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(MergingReceiverGeneratorBase.class, MergingReceiverTemplate.class); http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java index 537ae74..3bbfe95 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,14 +26,16 @@ import org.apache.drill.exec.record.VectorAccessible; public abstract class MergingReceiverTemplate implements MergingReceiverGeneratorBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverTemplate.class); - public MergingReceiverTemplate() throws SchemaChangeException { } - + @Override public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException; + @Override public abstract int doEval(@Named("leftIndex") int leftIndex, - @Named("rightIndex") int rightIndex); + @Named("rightIndex") int rightIndex) throws SchemaChangeException; - public abstract void doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + @Override + public abstract void doCopy(@Named("inIndex") int inIndex, + @Named("outIndex") int outIndex) throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index f7a3f22..ff3ac91 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -136,6 +136,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> this.outputCounts = new long[config.getNumSenders()]; } + @SuppressWarnings("resource") private RawFragmentBatch getNext(final int providerIndex) throws IOException { stats.startWait(); final RawFragmentBatchProvider provider = fragProviders[providerIndex]; @@ -194,7 +195,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // set up each (non-empty) incoming record batch final List<RawFragmentBatch> rawBatches = Lists.newArrayList(); int p = 0; - for (final RawFragmentBatchProvider provider : fragProviders) { + for (@SuppressWarnings("unused") final RawFragmentBatchProvider provider : fragProviders) { RawFragmentBatch rawBatch; // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema if (tempBatchHolder[p] != null) { @@ -316,7 +317,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> public int compare(final Node node1, final Node node2) { final int leftIndex = (node1.batchId << 16) + node1.valueIndex; final int rightIndex = (node2.batchId << 16) + node2.valueIndex; - return merger.doEval(leftIndex, rightIndex); + try { + return merger.doEval(leftIndex, rightIndex); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); + } } }); @@ -433,7 +438,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } // set the value counts in the outgoing vectors - for (final VectorWrapper vw : outgoingContainer) { + for (final VectorWrapper<?> vw : outgoingContainer) { vw.getValueVector().getMutator().setValueCount(outgoingPosition); } @@ -486,6 +491,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } tempBatchHolder[i] = batch; for (final SerializedField field : batch.getHeader().getDef().getFieldList()) { + @SuppressWarnings("resource") final ValueVector v = outgoingContainer.addOrGet(MaterializedField.create(field)); v.allocateNew(); } @@ -607,7 +613,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } private void allocateOutgoing() { - for (final VectorWrapper w : outgoingContainer) { + for (final VectorWrapper<?> w : outgoingContainer) { + @SuppressWarnings("resource") final ValueVector v = w.getValueVector(); if (v instanceof FixedWidthVector) { AllocationHelper.allocate(v, OUTGOING_BATCH_SIZE, 1); @@ -631,6 +638,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> try { final CodeGenerator<MergingReceiverGeneratorBase> cg = CodeGenerator.get(MergingReceiverGeneratorBase.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + cg.plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.saveCodeForDebugging(true); final ClassGenerator<MergingReceiverGeneratorBase> g = cg.getRoot(); ExpandableHyperContainer batch = null; @@ -707,7 +717,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> assert ++outputCounts[node.batchId] <= inputCounts[node.batchId] : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]); final int inIndex = (node.batchId << 16) + node.valueIndex; - merger.doCopy(inIndex, outgoingPosition); + try { + merger.doCopy(inIndex, outgoingPosition); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); + } outgoingPosition++; if (outgoingPosition == OUTGOING_BATCH_SIZE) { return false; http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java index 3c4e9e1..d2853e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,8 +29,6 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.IntVector; import com.google.common.collect.ImmutableList; @@ -39,13 +37,13 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionProjectorTemplate.class); private ImmutableList<TransferPair> transfers; - private VectorContainer partitionVectors; +// private VectorContainer partitionVectors; private int partitions; - private SelectionVector2 vector2; - private SelectionVector4 vector4; +// private SelectionVector2 vector2; +// private SelectionVector4 vector4; private SelectionVectorMode svMode; private RecordBatch outBatch; - private SchemaPath outputField; +// private SchemaPath outputField; private IntVector partitionValues; public OrderedPartitionProjectorTemplate() throws SchemaChangeException{ @@ -54,8 +52,12 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit private int getPartition(int index) { //TODO replace this with binary search int partitionIndex = 0; - while (partitionIndex < partitions - 1 && doEval(index, partitionIndex) >= 0) { - partitionIndex++; + try { + while (partitionIndex < partitions - 1 && doEval(index, partitionIndex) >= 0) { + partitionIndex++; + } + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); } return partitionIndex; } @@ -81,7 +83,7 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit this.svMode = incoming.getSchema().getSelectionVectorMode(); this.outBatch = outgoing; - this.outputField = outputField; +// this.outputField = outputField; partitionValues = (IntVector) outBatch.getValueAccessorById(IntVector.class, outBatch.getValueVectorId(outputField).getFieldIds()).getValueVector(); switch(svMode){ case FOUR_BYTE: @@ -93,12 +95,12 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit doSetup(context, incoming, outgoing, partitionVectors); } - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, - @Named("outgoing") RecordBatch outgoing, @Named("partitionVectors") VectorContainer partitionVectors); - public abstract int doEval(@Named("inIndex") int inIndex, @Named("partitionIndex") int partitionIndex); - - - - - + public abstract void doSetup(@Named("context") FragmentContext context, + @Named("incoming") VectorAccessible incoming, + @Named("outgoing") RecordBatch outgoing, + @Named("partitionVectors") VectorContainer partitionVectors) + throws SchemaChangeException; + public abstract int doEval(@Named("inIndex") int inIndex, + @Named("partitionIndex") int partitionIndex) + throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index baceba4..fede487 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -87,8 +87,8 @@ import com.sun.codemodel.JExpr; public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionRecordBatch.class); - private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; - private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; +// private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; +// private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; public static final CacheConfig<String, CachedVectorContainer> SINGLE_CACHE_CONFIG = CacheConfig // .newBuilder(CachedVectorContainer.class) // @@ -141,6 +141,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart this.completionFactor = pop.getCompletionFactor(); DistributedCache cache = null; + // Clearly, this code is not used! this.mmap = cache.getMultiMap(MULTI_CACHE_CONFIG); this.tableMap = cache.getMap(SINGLE_CACHE_CONFIG); Preconditions.checkNotNull(tableMap); @@ -151,10 +152,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart SchemaPath outputPath = popConfig.getRef(); MaterializedField outputField = MaterializedField.create(outputPath.getAsNamePart().getName(), Types.required(TypeProtos.MinorType.INT)); this.partitionKeyVector = (IntVector) TypeHelper.getNewVector(outputField, oContext.getAllocator()); - } - @Override public void close() { super.close(); @@ -163,6 +162,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } + @SuppressWarnings("resource") private boolean saveSamples() throws SchemaChangeException, ClassTransformationException, IOException { recordsSampled = 0; IterOutcome upstream; @@ -249,8 +249,6 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } } return true; - - } /** @@ -342,6 +340,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // Get all samples from distributed map + @SuppressWarnings("resource") SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator()); final VectorContainer allSamplesContainer = new VectorContainer(); final VectorContainer candidatePartitionTable = new VectorContainer(); @@ -360,6 +359,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } // sort the data incoming samples. + @SuppressWarnings("resource") SelectionVector4 newSv4 = containerBuilder.getSv4(); Sorter sorter = SortBatch.createNewSorter(context, orderDefs, allSamplesContainer); sorter.setup(context, newSv4, allSamplesContainer); @@ -388,6 +388,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } } candidatePartitionTable.setRecordCount(copier.getOutputRecords()); + @SuppressWarnings("resource") WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false); wrap = new CachedVectorContainer(batch, context.getDrillbitContext().getAllocator()); tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES); @@ -421,6 +422,11 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart final ErrorCollector collector = new ErrorCollectorImpl(); final ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + // Note: disabled for now. This may require some debugging: + // no tests are available for this operator. +// cg.getCodeGenerator().plainOldJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.getCodeGenerator().saveCodeForDebugging(true); int i = 0; for (Ordering od : orderings) { @@ -435,6 +441,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart "Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } + @SuppressWarnings("resource") ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); localAllocationVectors.add(vector); TypedFieldId fid = outgoing.add(vector); @@ -587,6 +594,11 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart final ClassGenerator<OrderedPartitionProjector> cg = CodeGenerator.getRoot( OrderedPartitionProjector.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + // Note: disabled for now. This may require some debugging: + // no tests are available for this operator. +// cg.getCodeGenerator().plainOldJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.getCodeGenerator().saveCodeForDebugging(true); for (VectorWrapper<?> vw : batch) { TransferPair tp = vw.getValueVector().getTransferPair(oContext.getAllocator()); http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java index c0ba8f9..5c953b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java @@ -33,6 +33,7 @@ import com.google.common.collect.Lists; public class OrderedPartitionSenderCreator implements RootCreator<OrderedPartitionSender> { + @SuppressWarnings("resource") @Override public RootExec getRoot(FragmentContext context, OrderedPartitionSender config, List<RecordBatch> children) throws ExecutionSetupException { http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index b22fbda..92364e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -266,6 +266,9 @@ public class PartitionSenderRootExec extends BaseRootExec { final ClassGenerator<Partitioner> cg ; cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + cg.getCodeGenerator().plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.getCodeGenerator().saveCodeForDebugging(true); ClassGenerator<Partitioner> cgInner = cg.getInnerGenerator("OutgoingRecordBatch"); final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, incoming, collector, context.getFunctionRegistry()); http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 556460c..aa72c44 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -67,9 +67,6 @@ public abstract class PartitionerTemplate implements Partitioner { private int outgoingRecordBatchSize = DEFAULT_RECORD_BATCH_SIZE; - public PartitionerTemplate() throws SchemaChangeException { - } - @Override public List<? extends PartitionOutgoingBatch> getOutgoingBatches() { return outgoingBatches; @@ -109,7 +106,7 @@ public abstract class PartitionerTemplate implements Partitioner { // create outgoingBatches only for subset of Destination Points if ( fieldId >= start && fieldId < end ) { logger.debug("start: {}, count: {}, fieldId: {}", start, end, fieldId); - outgoingBatches.add(new OutgoingRecordBatch(stats, popConfig, + outgoingBatches.add(newOutgoingRecordBatch(stats, popConfig, context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId())); } fieldId++; @@ -137,6 +134,18 @@ public abstract class PartitionerTemplate implements Partitioner { } } + /** + * Shim method to be overridden in plain-old Java mode by the subclass to instantiate the + * generated inner class. Byte-code manipulation appears to fix up the byte codes + * directly. The name is special, it must be "new" + inner class name. + */ + + protected OutgoingRecordBatch newOutgoingRecordBatch( + OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel, + FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) { + return new OutgoingRecordBatch(stats, operator, tunnel, context, allocator, oppositeMinorFragmentId); + } + @Override public OperatorStats getStats() { return stats; @@ -202,7 +211,12 @@ public abstract class PartitionerTemplate implements Partitioner { * @throws IOException */ private void doCopy(int svIndex) throws IOException { - int index = doEval(svIndex); + int index; + try { + index = doEval(svIndex); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); + } if ( index >= start && index < end) { OutgoingRecordBatch outgoingBatch = outgoingBatches.get(index - start); outgoingBatch.copy(svIndex); @@ -210,14 +224,20 @@ public abstract class PartitionerTemplate implements Partitioner { } @Override + public void initialize() { } + + @Override public void clear() { for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) { outgoingRecordBatch.clear(); } } - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; - public abstract int doEval(@Named("inIndex") int inIndex); + public abstract void doSetup(@Named("context") FragmentContext context, + @Named("incoming") RecordBatch incoming, + @Named("outgoing") OutgoingRecordBatch[] outgoing) + throws SchemaChangeException; + public abstract int doEval(@Named("inIndex") int inIndex) throws SchemaChangeException; public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible { @@ -245,7 +265,11 @@ public abstract class PartitionerTemplate implements Partitioner { } protected void copy(int inIndex) throws IOException { - doEval(inIndex, recordCount); + try { + doEval(inIndex, recordCount); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); + } recordCount++; totalRecords++; if (recordCount == outgoingRecordBatchSize) { @@ -260,10 +284,12 @@ public abstract class PartitionerTemplate implements Partitioner { } @RuntimeOverridden - protected void doSetup(@Named("incoming") RecordBatch incoming, @Named("outgoing") VectorAccessible outgoing) {}; + protected void doSetup(@Named("incoming") RecordBatch incoming, + @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException { }; @RuntimeOverridden - protected void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { }; + protected void doEval(@Named("inIndex") int inIndex, + @Named("outIndex") int outIndex) throws SchemaChangeException { }; public void flush(boolean schemaChanged) throws IOException { if (dropAll) { @@ -350,12 +376,17 @@ public abstract class PartitionerTemplate implements Partitioner { public void initializeBatch() { for (VectorWrapper<?> v : incoming) { // create new vector + @SuppressWarnings("resource") ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator); outgoingVector.setInitialCapacity(outgoingRecordBatchSize); vectorContainer.add(outgoingVector); } allocateOutgoingRecordBatch(); - doSetup(incoming, vectorContainer); + try { + doSetup(incoming, vectorContainer); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); + } } public void resetBatch() { http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 1227e41..1ecdaf5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -322,6 +322,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { final List<TransferPair> transfers = Lists.newArrayList(); final ClassGenerator<Projector> cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + cg.getCodeGenerator().plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.getCodeGenerator().saveCodeForDebugging(true); final IntHashSet transferFieldIds = new IntHashSet(); @@ -481,7 +484,11 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } try { - this.projector = context.getImplementationClass(cg.getCodeGenerator()); + CodeGenerator<Projector> codeGen = cg.getCodeGenerator(); + codeGen.plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// codeGen.saveCodeForDebugging(true); + this.projector = context.getImplementationClass(codeGen); projector.setup(context, incoming, this, transfers); } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java index a6294d8..9011e1f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -39,7 +39,7 @@ public abstract class ProjectorTemplate implements Projector { private SelectionVector4 vector4; private SelectionVectorMode svMode; - public ProjectorTemplate() throws SchemaChangeException { + public ProjectorTemplate() { } @Override @@ -51,7 +51,11 @@ public abstract class ProjectorTemplate implements Projector { case TWO_BYTE: final int count = recordCount; for (int i = 0; i < count; i++, firstOutputIndex++) { - doEval(vector2.getIndex(i), firstOutputIndex); + try { + doEval(vector2.getIndex(i), firstOutputIndex); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); + } } return recordCount; @@ -59,7 +63,11 @@ public abstract class ProjectorTemplate implements Projector { final int countN = recordCount; int i; for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) { - doEval(i, firstOutputIndex); + try { + doEval(i, firstOutputIndex); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); + } } if (i < startIndex + recordCount || startIndex > 0) { for (TransferPair t : transfers) { @@ -93,7 +101,11 @@ public abstract class ProjectorTemplate implements Projector { doSetup(context, incoming, outgoing); } - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); - public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract void doSetup(@Named("context") FragmentContext context, + @Named("incoming") RecordBatch incoming, + @Named("outgoing") RecordBatch outgoing) + throws SchemaChangeException; + public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) + throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index 689607e..152cabb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -162,6 +162,12 @@ public class SortBatch extends AbstractRecordBatch<Sort> { public static Sorter createNewSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) throws ClassTransformationException, IOException, SchemaChangeException{ CodeGenerator<Sorter> cg = CodeGenerator.get(Sorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + // This operator may be deprecated. No tests exercise it. + // There is no way, at present, to verify if the generated code + // works with Plain-old Java. +// cg.plainOldJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.saveCodeForDebugging(true); ClassGenerator<Sorter> g = cg.getRoot(); g.setMappingSet(mainMapping); http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java index 8ead6ab..9e265d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,7 +27,5 @@ public interface Copier { public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate4.class); public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException; - public abstract int copyRecords(int index, int recordCount); - - -} \ No newline at end of file + public abstract int copyRecords(int index, int recordCount) throws SchemaChangeException; +} http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java index d2b94c5..bdee8ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -43,7 +43,7 @@ public abstract class CopierTemplate2 implements Copier{ } @Override - public int copyRecords(int index, int recordCount){ + public int copyRecords(int index, int recordCount) throws SchemaChangeException { for(VectorWrapper<?> out : outgoing){ MajorType type = out.getField().getType(); if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) { @@ -61,8 +61,12 @@ public abstract class CopierTemplate2 implements Copier{ return outgoingPosition; } - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); - public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract void doSetup(@Named("context") FragmentContext context, + @Named("incoming") RecordBatch incoming, + @Named("outgoing") RecordBatch outgoing) + throws SchemaChangeException; + public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) + throws SchemaChangeException; http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java index 57c2e36..1ae7df9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -44,7 +44,7 @@ public abstract class CopierTemplate4 implements Copier{ @Override - public int copyRecords(int index, int recordCount){ + public int copyRecords(int index, int recordCount) throws SchemaChangeException { for(VectorWrapper<?> out : outgoing){ MajorType type = out.getField().getType(); if (!Types.isFixedWidthType(type) || Types.isRepeated(type)) { @@ -62,9 +62,11 @@ public abstract class CopierTemplate4 implements Copier{ return outgoingPosition; } - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); - public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - - - + public abstract void doSetup(@Named("context") FragmentContext context, + @Named("incoming") RecordBatch incoming, + @Named("outgoing") RecordBatch outgoing) + throws SchemaChangeException; + public abstract void doEval(@Named("inIndex") int inIndex, + @Named("outIndex") int outIndex) + throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 799bf7f..b875b66 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -34,7 +34,6 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.util.CallBack; import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; @@ -97,7 +96,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect @Override protected IterOutcome doWork() { int incomingRecordCount = incoming.getRecordCount(); - int copiedRecords = copier.copyRecords(0, incomingRecordCount); + int copiedRecords; + try { + copiedRecords = copier.copyRecords(0, incomingRecordCount); + } catch (SchemaChangeException e) { + throw new IllegalStateException(e); + } if (copiedRecords < incomingRecordCount) { for(VectorWrapper<?> v : container){ @@ -136,9 +140,13 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect int recordCount = incoming.getRecordCount(); int remainingRecordCount = incoming.getRecordCount() - remainderIndex; int copiedRecords; - while((copiedRecords = copier.copyRecords(remainderIndex, remainingRecordCount)) == 0) { - logger.debug("Copied zero records. Retrying"); - container.zeroVectors(); + try { + while((copiedRecords = copier.copyRecords(remainderIndex, remainingRecordCount)) == 0) { + logger.debug("Copied zero records. Retrying"); + container.zeroVectors(); + } + } catch (SchemaChangeException e) { + throw new IllegalStateException(e); } /* @@ -222,7 +230,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE); for(VectorWrapper<?> vv : incoming){ - TransferPair tp = vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); + vv.getValueVector().makeTransferPair(container.addOrGet(vv.getField(), callBack)); } try { @@ -230,6 +238,9 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect CopyUtil.generateCopies(cg.getRoot(), incoming, false); Copier copier = context.getImplementationClass(cg); copier.setupRemover(context, incoming, this); + cg.plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.saveCodeForDebugging(true); return copier; } catch (ClassTransformationException | IOException e) { @@ -245,6 +256,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing, SchemaChangeCallBack callBack) throws SchemaChangeException{ for(VectorWrapper<?> vv : batch){ + @SuppressWarnings("resource") ValueVector v = vv.getValueVectors()[0]; v.makeTransferPair(container.addOrGet(v.getField(), callBack)); } @@ -252,9 +264,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect try { final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry(), context.getOptions()); CopyUtil.generateCopies(cg.getRoot(), batch, true); + cg.plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.saveCodeForDebugging(true); Copier copier = context.getImplementationClass(cg); copier.setupRemover(context, batch, outgoing); - return copier; } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); @@ -265,7 +279,4 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect public WritableBatch getWritableBatch() { return WritableBatch.get(this); } - - - } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index cff2abd..06b7bdb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -152,6 +152,7 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { return true; } + @SuppressWarnings("resource") private IterOutcome doWork() throws ClassTransformationException, IOException, SchemaChangeException { if (allocationVectors != null) { for (ValueVector v : allocationVectors) { @@ -180,11 +181,13 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { return IterOutcome.OK_NEW_SCHEMA; } - final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + cg.getCodeGenerator().plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.getCodeGenerator().saveCodeForDebugging(true); int index = 0; for(VectorWrapper<?> vw : current) { - ValueVector vvIn = vw.getValueVector(); + ValueVector vvIn = vw.getValueVector(); // get the original input column names SchemaPath inputPath = SchemaPath.getSimplePath(vvIn.getField().getPath()); // get the renamed column names http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java index fdccdb6..a1fe727 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllerTemplate.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,15 +32,14 @@ public abstract class UnionAllerTemplate implements UnionAller { private ImmutableList<TransferPair> transfers; - public UnionAllerTemplate() throws SchemaChangeException { - - } - @Override public final int unionRecords(int startIndex, final int recordCount, int firstOutputIndex) { - int i; - for (i = startIndex; i < startIndex + recordCount; i++, firstOutputIndex++) { - doEval(i, firstOutputIndex); + try { + for (int i = startIndex; i < startIndex + recordCount; i++, firstOutputIndex++) { + doEval(i, firstOutputIndex); + } + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); } for (TransferPair t : transfers) { @@ -50,11 +49,16 @@ public abstract class UnionAllerTemplate implements UnionAller { } @Override - public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{ + public final void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{ this.transfers = ImmutableList.copyOf(transfers); doSetup(context, incoming, outgoing); } - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); - public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract void doSetup(@Named("context") FragmentContext context, + @Named("incoming") RecordBatch incoming, + @Named("outgoing") RecordBatch outgoing) + throws SchemaChangeException; + public abstract void doEval(@Named("inIndex") int inIndex, + @Named("outIndex") int outIndex) + throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java index 21dfbba..55c27c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/NoFrameSupportTemplate.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -117,7 +117,11 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { private void cleanPartition() { partition = null; - resetValues(); + try { + resetValues(); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); + } for (VectorWrapper<?> vw : internal) { if ((vw.getValueVector() instanceof BaseDataValueVector)) { ((BaseDataValueVector) vw.getValueVector()).reset(); @@ -173,15 +177,23 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { private void copyPrevToInternal(VectorAccessible current, int row) { logger.trace("copying {} into internal", row - 1); - setupCopyPrev(current, internal); - copyPrev(row - 1, 0); + try { + setupCopyPrev(current, internal); + copyPrev(row - 1, 0); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); + } lagCopiedToInternal = true; } private void copyPrevFromInternal() { if (lagCopiedToInternal) { - setupCopyFromInternal(internal, container); - copyFromInternal(0, 0); + try { + setupCopyFromInternal(internal, container); + copyFromInternal(0, 0); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); + } lagCopiedToInternal = false; } } @@ -218,8 +230,12 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { // check first container from start row, and subsequent containers from first row for (; row < recordCount; row++, length++) { - if (!isSamePartition(start, current, row, batch)) { - break outer; + try { + if (!isSamePartition(start, current, row, batch)) { + break outer; + } + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); } } @@ -231,11 +247,15 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { row = 0; } - if (!requireFullPartition) { - // this is the last batch of current partition if - lastBatch = row < outputCount // partition ends before the end of the batch - || batches.size() == 1 // it's the last available batch - || !isSamePartition(start, current, 0, batches.get(1)); // next batch contains a different partition + try { + if (!requireFullPartition) { + // this is the last batch of current partition if + lastBatch = row < outputCount // partition ends before the end of the batch + || batches.size() == 1 // it's the last available batch + || !isSamePartition(start, current, 0, batches.get(1)); // next batch contains a different partition + } + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); } partition.updateLength(length, !(requireFullPartition || lastBatch)); @@ -284,7 +304,9 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { * @param outIndex index of row * @param partition object used by "computed" window functions */ - public abstract void outputRow(@Named("outIndex") int outIndex, @Named("partition") Partition partition); + public abstract void outputRow(@Named("outIndex") int outIndex, + @Named("partition") Partition partition) + throws SchemaChangeException; /** * Called once per partition, before processing the partition. Used to setup read/write vectors @@ -294,7 +316,8 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { * @throws SchemaChangeException */ public abstract void setupPartition(@Named("incoming") WindowDataBatch incoming, - @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException; + @Named("outgoing") VectorAccessible outgoing) + throws SchemaChangeException; /** * copies value(s) from inIndex row to outIndex row. Mostly used by LEAD. inIndex always points to the row next to @@ -302,8 +325,12 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { * @param inIndex source row of the copy * @param outIndex destination row of the copy. */ - public abstract void copyNext(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - public abstract void setupCopyNext(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing); + public abstract void copyNext(@Named("inIndex") int inIndex, + @Named("outIndex") int outIndex) + throws SchemaChangeException; + public abstract void setupCopyNext(@Named("incoming") VectorAccessible incoming, + @Named("outgoing") VectorAccessible outgoing) + throws SchemaChangeException; /** * copies value(s) from inIndex row to outIndex row. Mostly used by LAG. inIndex always points to the previous row @@ -311,16 +338,24 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { * @param inIndex source row of the copy * @param outIndex destination row of the copy. */ - public abstract void copyPrev(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - public abstract void setupCopyPrev(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing); - - public abstract void copyFromInternal(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - public abstract void setupCopyFromInternal(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing); + public abstract void copyPrev(@Named("inIndex") int inIndex, + @Named("outIndex") int outIndex) + throws SchemaChangeException; + public abstract void setupCopyPrev(@Named("incoming") VectorAccessible incoming, + @Named("outgoing") VectorAccessible outgoing) + throws SchemaChangeException; + + public abstract void copyFromInternal(@Named("inIndex") int inIndex, + @Named("outIndex") int outIndex) + throws SchemaChangeException; + public abstract void setupCopyFromInternal(@Named("incoming") VectorAccessible incoming, + @Named("outgoing") VectorAccessible outgoing) + throws SchemaChangeException; /** * reset all window functions */ - public abstract boolean resetValues(); + public abstract boolean resetValues() throws SchemaChangeException; /** * compares two rows from different batches (can be the same), if they have the same value for the partition by @@ -331,8 +366,12 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { * @param b2 batch for second row * @return true if the rows are in the same partition */ - public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, - @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2); + @Override + public abstract boolean isSamePartition(@Named("b1Index") int b1Index, + @Named("b1") VectorAccessible b1, + @Named("b2Index") int b2Index, + @Named("b2") VectorAccessible b2) + throws SchemaChangeException; /** * compares two rows from different batches (can be the same), if they have the same value for the order by @@ -343,6 +382,10 @@ public abstract class NoFrameSupportTemplate implements WindowFramer { * @param b2 batch for second row * @return true if the rows are in the same partition */ - public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, - @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2); + @Override + public abstract boolean isPeer(@Named("b1Index") int b1Index, + @Named("b1") VectorAccessible b1, + @Named("b2Index") int b2Index, + @Named("b2") VectorAccessible b2) + throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index 2404393..989ea96 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -40,6 +40,7 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.WindowPOP; +import org.apache.drill.exec.physical.impl.project.Projector; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; @@ -208,13 +209,19 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { final VectorAccessible last = batches.get(batches.size() - 1); final int lastSize = last.getRecordCount(); - final boolean partitionEndReached = !framers[0].isSamePartition(currentSize - 1, current, lastSize - 1, last); - final boolean frameEndReached = partitionEndReached || !framers[0].isPeer(currentSize - 1, current, lastSize - 1, last); + boolean partitionEndReached; + boolean frameEndReached; + try { + partitionEndReached = !framers[0].isSamePartition(currentSize - 1, current, lastSize - 1, last); + frameEndReached = partitionEndReached || !framers[0].isPeer(currentSize - 1, current, lastSize - 1, last); - for (final WindowFunction function : functions) { - if (!function.canDoWork(batches.size(), popConfig, frameEndReached, partitionEndReached)) { - return false; + for (final WindowFunction function : functions) { + if (!function.canDoWork(batches.size(), popConfig, frameEndReached, partitionEndReached)) { + return false; + } } + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); } return true; @@ -353,8 +360,12 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { } cg.getBlock("resetValues")._return(JExpr.TRUE); + CodeGenerator<WindowFramer> codeGen = cg.getCodeGenerator(); + codeGen.plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// codeGen.saveCodeForDebugging(true); - return context.getImplementationClass(cg); + return context.getImplementationClass(codeGen); } /** http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java index 3d2d0fc..a7964d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFramer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -57,8 +57,11 @@ public interface WindowFramer { * @param b2 batch for second row * @return true if the rows are in the same partition */ - boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, - @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2); + boolean isSamePartition(@Named("b1Index") int b1Index, + @Named("b1") VectorAccessible b1, + @Named("b2Index") int b2Index, + @Named("b2") VectorAccessible b2) + throws SchemaChangeException; /** * compares two rows from different batches (can be the same), if they have the same value for the order by @@ -69,6 +72,9 @@ public interface WindowFramer { * @param b2 batch for second row * @return true if the rows are in the same partition */ - boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1, - @Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2); + boolean isPeer(@Named("b1Index") int b1Index, + @Named("b1") VectorAccessible b1, + @Named("b2Index") int b2Index, + @Named("b2") VectorAccessible b2) + throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 95d64bd..8fe05f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -227,7 +227,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { if (mSorter != null) { mSorter.clear(); } - for(Iterator iter = this.currSpillDirs.iterator(); iter.hasNext(); iter.remove()) { + for(Iterator<Path> iter = this.currSpillDirs.iterator(); iter.hasNext(); iter.remove()) { Path path = (Path)iter.next(); try { if (fs != null && path != null && fs.exists(path)) { @@ -254,6 +254,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { case OK: case OK_NEW_SCHEMA: for (VectorWrapper<?> w : incoming) { + @SuppressWarnings("resource") ValueVector v = container.addOrGet(w.getField()); if (v instanceof AbstractContainerVector) { w.getValueVector().makeTransferPair(v); // Can we remove this hack? @@ -278,6 +279,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } } + @SuppressWarnings("resource") @Override public IterOutcome innerNext() { if (schema != null) { @@ -539,6 +541,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { if (batchGroups.size() == 0) { break; } + @SuppressWarnings("resource") BatchGroup batch = batchGroups.pollLast(); assert batch != null : "Encountered a null batch during merge and spill operation"; batchGroupList.add(batch); @@ -610,9 +613,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } private SelectionVector2 newSV2() throws OutOfMemoryException, InterruptedException { + @SuppressWarnings("resource") SelectionVector2 sv2 = new SelectionVector2(oAllocator); if (!sv2.allocateNewSafe(incoming.getRecordCount())) { try { + @SuppressWarnings("resource") final BatchGroup merged = mergeAndSpill(batchGroups); if (merged != null) { spilledBatchGroups.add(merged); @@ -711,19 +716,19 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { g.rotateBlock(); g.getEvalBlock()._return(JExpr.lit(0)); - cg.plainOldJavaCapable(true); // This class can generate plain-old Java. + cg.plainJavaCapable(true); // This class can generate plain-old Java. // Uncomment out this line to debug the generated code. -// cg.preferPlainOldJava(true); +// cg.saveCodeForDebugging(true); return context.getImplementationClass(cg); } public SingleBatchSorter createNewSorter(FragmentContext context, VectorAccessible batch) throws ClassTransformationException, IOException, SchemaChangeException{ CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(SingleBatchSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); - cg.plainOldJavaCapable(true); // This class can generate plain-old Java. + cg.plainJavaCapable(true); // This class can generate plain-old Java. // Uncomment out this line to debug the generated code. -// cg.preferPlainOldJava(true); +// cg.saveCodeForDebugging(true); generateComparisons(cg.getRoot(), batch); return context.getImplementationClass(cg); } @@ -767,6 +772,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { try { if (copier == null) { CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); + cg.plainJavaCapable(true); + // Uncomment out this line to debug the generated code. +// cg.saveCodeForDebugging(true); ClassGenerator<PriorityQueueCopier> g = cg.getRoot(); generateComparisons(g, batch); @@ -779,8 +787,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { copier.close(); } + @SuppressWarnings("resource") BufferAllocator allocator = spilling ? copierAllocator : oAllocator; for (VectorWrapper<?> i : batch) { + @SuppressWarnings("resource") ValueVector v = TypeHelper.getNewVector(i.getField(), allocator); outputContainer.add(v); } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index 3ed9cd0..34aa46a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -180,7 +180,11 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { final int sv1 = vector4.get(leftIndex); final int sv2 = vector4.get(rightIndex); compares++; - return doEval(sv1, sv2); + try { + return doEval(sv1, sv2); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException(e); + } } @Override @@ -194,6 +198,11 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { } } - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing); - public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex); + public abstract void doSetup(@Named("context") FragmentContext context, + @Named("incoming") VectorContainer incoming, + @Named("outgoing") RecordBatch outgoing) + throws SchemaChangeException; + public abstract int doEval(@Named("leftIndex") int leftIndex, + @Named("rightIndex") int rightIndex) + throws SchemaChangeException; } http://git-wip-us.apache.org/repos/asf/drill/blob/ee399317/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java index e59d1b1..fc28fc3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.impl.xsort; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector2; @@ -29,5 +28,4 @@ public interface SingleBatchSorter { public void sort(SelectionVector2 vector2) throws SchemaChangeException; public static TemplateClassDefinition<SingleBatchSorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<SingleBatchSorter>(SingleBatchSorter.class, SingleBatchSorterTemplate.class); - }
