DRILL-800: Partitioner is dropping records that can't fit in the available space of ValueVectors in OutgoingRecordBatch.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6dd3ff9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6dd3ff9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6dd3ff9c Branch: refs/heads/master Commit: 6dd3ff9cb3fbfe4748cfe6f64d4ba1d4ecc404f2 Parents: ef28054 Author: vkorukanti <[email protected]> Authored: Thu May 22 18:26:38 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed May 28 09:13:58 2014 -0700 ---------------------------------------------------------------------- .../partitionsender/OutgoingRecordBatch.java | 20 ++--- .../PartitionSenderRootExec.java | 36 ++++----- .../impl/partitionsender/Partitioner.java | 5 -- .../partitionsender/PartitionerSV2Template.java | 60 --------------- .../partitionsender/PartitionerSV4Template.java | 61 ---------------- .../partitionsender/PartitionerTemplate.java | 77 +++++++++++++++++--- 6 files changed, 89 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java index 646f1a0..c86da8c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java @@ -71,10 +71,9 @@ public class OutgoingRecordBatch implements VectorAccessible { private volatile boolean ok = true; private BatchSchema outSchema; private int recordCount; - private int recordCapacity; - private static int DEFAULT_ALLOC_SIZE = 20000; - private static int DEFAULT_VARIABLE_WIDTH_SIZE = 2048; private OperatorStats stats; + private static final int DEFAULT_RECORD_BATCH_SIZE = 20000; + private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200; public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel, RecordBatch incoming, FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) { @@ -89,9 +88,8 @@ public class OutgoingRecordBatch implements VectorAccessible { } public void flushIfNecessary() { - if (recordCount == recordCapacity) logger.debug("Flush is necesary: Count is " + recordCount + ", capacity is " + recordCapacity); try { - if (recordCount == recordCapacity){ + if (recordCount == DEFAULT_RECORD_BATCH_SIZE) { flush(); stats.addLongStat(PartitionSenderStats.BATCHES_SENT, 1l); stats.addLongStat(PartitionSenderStats.RECORDS_SENT, recordCount); @@ -159,8 +157,8 @@ public class OutgoingRecordBatch implements VectorAccessible { recordCount = 0; vectorContainer.zeroVectors(); for (VectorWrapper<?> v : vectorContainer) { -// logger.debug("Reallocating vv to capacity " + DEFAULT_ALLOC_SIZE + " after flush."); - VectorAllocator.getAllocator(v.getValueVector(), DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_ALLOC_SIZE); +// logger.debug("Reallocating vv to capacity " + DEFAULT_RECORD_BATCH_SIZE + " after flush."); + VectorAllocator.getAllocator(v.getValueVector(), DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE); } if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); } return true; @@ -173,7 +171,6 @@ public class OutgoingRecordBatch implements VectorAccessible { public void initializeBatch() { isLast = false; vectorContainer.clear(); - recordCapacity = DEFAULT_ALLOC_SIZE; SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); for (VectorWrapper<?> v : incoming) { @@ -183,12 +180,12 @@ public class OutgoingRecordBatch implements VectorAccessible { // allocate a new value vector ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator); - VectorAllocator.getAllocator(outgoingVector, 100).alloc(recordCapacity); + VectorAllocator.getAllocator(outgoingVector, DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE); vectorContainer.add(outgoingVector); -// logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector()); +// logger.debug("Reallocating to cap " + DEFAULT_RECORD_BATCH_SIZE + " because of newly init'd vector : " + v.getValueVector()); } outSchema = bldr.build(); -// logger.debug("Initialized OutgoingRecordBatch. RecordCount: " + recordCount + ", cap: " + recordCapacity + " Schema: " + outSchema); +// logger.debug("Initialized OutgoingRecordBatch. RecordCount: " + recordCount + ", cap: " + DEFAULT_RECORD_BATCH_SIZE + " Schema: " + outSchema); } /** @@ -198,7 +195,6 @@ public class OutgoingRecordBatch implements VectorAccessible { public void resetBatch() { isLast = false; recordCount = 0; - recordCapacity = 0; for (VectorWrapper<?> v : vectorContainer){ v.getValueVector().clear(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 7893be8..d0eaf9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -187,20 +187,7 @@ public class PartitionSenderRootExec implements RootExec { boolean hyper = false; - switch(incoming.getSchema().getSelectionVectorMode()){ - case NONE: - cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry()); - break; - case TWO_BYTE: - cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV2, context.getFunctionRegistry()); - break; - case FOUR_BYTE: - cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION_SV4, context.getFunctionRegistry()); - hyper = true; - break; - default: - throw new UnsupportedOperationException(); - } + cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry()); final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, incoming, collector, context.getFunctionRegistry()); if (collector.hasErrors()) { @@ -285,10 +272,12 @@ public class PartitionSenderRootExec implements RootExec { .component(bucket)) .component(JExpr.lit(fieldId)))) .invoke("copyFromSafe") - .arg(inIndex) - .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount")) - .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush")) - ._return(); + .arg(inIndex) + .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount")) + .arg(incomingVV).not()) + ._then() + .add(((JExpression) outgoingBatches.component(bucket)).invoke("flush")) + ._return(JExpr.lit(false)); } else { // the following block generates calls to copyFrom(); e.g.: // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex, @@ -301,10 +290,12 @@ public class PartitionSenderRootExec implements RootExec { .component(bucket)) .component(JExpr.lit(fieldId)))) .invoke("copyFromSafe") - .arg(inIndex) - .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount")) - .arg(incomingVV.component(inIndex.shrz(JExpr.lit(16)))).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush")) - ._return(); + .arg(inIndex) + .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount")) + .arg(incomingVV.component(inIndex.shrz(JExpr.lit(16)))).not()) + ._then() + .add(((JExpression) outgoingBatches.component(bucket)).invoke("flush")) + ._return(JExpr.lit(false)); } ++fieldId; @@ -312,6 +303,7 @@ public class PartitionSenderRootExec implements RootExec { // generate the OutgoingRecordBatch helper invocations cg.getEvalBlock().add(((JExpression) outgoingBatches.component(bucket)).invoke("incRecordCount")); cg.getEvalBlock().add(((JExpression) outgoingBatches.component(bucket)).invoke("flushIfNecessary")); + cg.getEvalBlock()._return(JExpr.lit(true)); try { // compile and setup generated code // partitioner = context.getImplementationClassMultipleOutput(cg); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 3ffead0..7d3998b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -31,9 +31,4 @@ public interface Partitioner { public abstract void partitionBatch(RecordBatch incoming); public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); - - public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV2 = new TemplateClassDefinition<>(Partitioner.class, PartitionerSV2Template.class); - - public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION_SV4 = new TemplateClassDefinition<>(Partitioner.class, PartitionerSV4Template.class); - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java deleted file mode 100644 index 981055a..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV2Template.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.partitionsender; - -import javax.inject.Named; - -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.selection.SelectionVector2; - -public abstract class PartitionerSV2Template implements Partitioner { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerSV2Template.class); - - private SelectionVector2 sv2; - - public PartitionerSV2Template() throws SchemaChangeException { - } - - @Override - public final void setup(FragmentContext context, - RecordBatch incoming, - OutgoingRecordBatch[] outgoing) throws SchemaChangeException { - - this.sv2 = incoming.getSelectionVector2(); - - doSetup(context, incoming, outgoing); - - } - - @Override - public void partitionBatch(RecordBatch incoming) { - - for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { - // for each record - doEval(sv2.getIndex(recordId), 0); - } - - } - - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; - public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java deleted file mode 100644 index 2e00f9b..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerSV4Template.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.partitionsender; - -import javax.inject.Named; - -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.selection.SelectionVector2; -import org.apache.drill.exec.record.selection.SelectionVector4; - -public abstract class PartitionerSV4Template implements Partitioner { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerSV4Template.class); - - private SelectionVector4 sv4; - - public PartitionerSV4Template() throws SchemaChangeException { - } - - @Override - public final void setup(FragmentContext context, - RecordBatch incoming, - OutgoingRecordBatch[] outgoing) throws SchemaChangeException { - - this.sv4 = incoming.getSelectionVector4(); - - doSetup(context, incoming, outgoing); - - } - - @Override - public void partitionBatch(RecordBatch incoming) { - - for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { - // for each record - doEval(sv4.get(recordId), 0); - } - - } - - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; - public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6dd3ff9c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 1b0aad2..fe62b73 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -19,13 +19,24 @@ package org.apache.drill.exec.physical.impl.partitionsender; import javax.inject.Named; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; public abstract class PartitionerTemplate implements Partitioner { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerTemplate.class); + private SelectionVector2 sv2; + private SelectionVector4 sv4; + + private static final String REWRITE_MSG = "Failed to write the record {} in available space. Attempting to rewrite."; + private static final String RECORD_TOO_BIG_MSG = "Record {} is too big to fit into the allocated memory of ValueVector."; + public PartitionerTemplate() throws SchemaChangeException { } @@ -36,24 +47,70 @@ public abstract class PartitionerTemplate implements Partitioner { doSetup(context, incoming, outgoing); + SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode(); + switch(svMode){ + case FOUR_BYTE: + this.sv4 = incoming.getSelectionVector4(); + break; + + case TWO_BYTE: + this.sv2 = incoming.getSelectionVector2(); + break; + + case NONE: + break; + + default: + throw new UnsupportedOperationException("Unknown selection vector mode: " + svMode.toString()); + } } @Override public void partitionBatch(RecordBatch incoming) { + SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode(); - for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { - // for each record + // Keeping the for loop inside the case to avoid case evaluation for each record. + switch(svMode) { + case NONE: + for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { + if (!doEval(recordId)) { + logger.trace(REWRITE_MSG, recordId); + if (!doEval(recordId)) { + logger.debug(RECORD_TOO_BIG_MSG, recordId); + } + } + } + break; - // TODO: if attempting to insert too large of a value into a vector: - // - send the batch - // - reallocate (at least the size of the current value) and try again - doEval(recordId, 0); - } + case TWO_BYTE: + for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { + int svIndex = sv2.getIndex(recordId); + if (!doEval(svIndex)) { + logger.trace(REWRITE_MSG, recordId); + if (!doEval(svIndex)) { + logger.debug(RECORD_TOO_BIG_MSG, recordId); + } + } + } + break; + + case FOUR_BYTE: + for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { + int svIndex = sv4.get(recordId); + if (!doEval(svIndex)) { + logger.trace(REWRITE_MSG, recordId); + if (!doEval(svIndex)) { + logger.debug(RECORD_TOO_BIG_MSG, recordId); + } + } + } + break; + default: + throw new UnsupportedOperationException("Unknown selection vector mode: " + svMode.toString()); + } } public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; - public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); - - + public abstract boolean doEval(@Named("inIndex") int inIndex); }
