optimized generated code for partition sender
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8ffc674b Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8ffc674b Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8ffc674b Branch: refs/heads/master Commit: 8ffc674b7ce699aa6905166bf2060bc4e48e45c3 Parents: 4a10ea1 Author: Ben Becker <[email protected]> Authored: Sat Aug 10 01:20:53 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Aug 15 18:31:31 2013 -0700 ---------------------------------------------------------------------- .../partitionsender/OutgoingRecordBatch.java | 52 ++++----- .../PartitionSenderRootExec.java | 115 ++++++++++++------- .../exec/work/RemoteFragmentRunnerListener.java | 11 +- 3 files changed, 107 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ffc674b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java index 4ab598c..b40ce4c 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java @@ -19,11 +19,10 @@ package org.apache.drill.exec.physical.impl.partitionsender; import java.util.Iterator; -import java.util.List; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.proto.ExecProtos; @@ -52,7 +51,6 @@ public class OutgoingRecordBatch implements RecordBatch { private RecordBatch incoming; private FragmentContext context; private BatchSchema outSchema; - private List<ValueVector> valueVectors; private VectorContainer vectorContainer; private int recordCount; private int recordCapacity; @@ -65,29 +63,25 @@ public class OutgoingRecordBatch implements RecordBatch { initializeBatch(); } - public OutgoingRecordBatch() { } - - public void init(HashPartitionSender operator, BitTunnel tunnel, RecordBatch incoming, FragmentContext context) { - this.incoming = incoming; - this.context = context; - this.operator = operator; - this.tunnel = tunnel; - resetBatch(); - } - public void flushIfNecessary() { - if (recordCount == recordCapacity - 1) flush(); + if (recordCount == recordCapacity) logger.debug("Flush is necesary: Count is " + recordCount + ", capacity is " + recordCapacity); + try { + if (recordCount == recordCapacity) flush(); + } catch (SchemaChangeException e) { + // TODO: + logger.error("Unable to flush outgoing record batch: " + e); + } } public void incRecordCount() { ++recordCount; } - public void flush() { + public void flush() throws SchemaChangeException { if (recordCount == 0) { logger.warn("Attempted to flush an empty record batch"); - return; } + logger.debug("Flushing record batch. count is: " + recordCount + ", capacity is " + recordCapacity); final ExecProtos.FragmentHandle handle = context.getHandle(); FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, handle.getQueryId(), @@ -96,23 +90,26 @@ public class OutgoingRecordBatch implements RecordBatch { operator.getOppositeMajorFragmentId(), 0, getWritableBatch()); - tunnel.sendRecordBatch(statusHandler, context, writableBatch); + tunnel.sendRecordBatch(statusHandler, context, writableBatch); // reset values and reallocate the buffer for each value vector. NOTE: the value vector is directly // referenced by generated code and must not be replaced. recordCount = 0; for (VectorWrapper v : vectorContainer) { - getAllocator(TypeHelper.getNewVector(v.getField(), context.getAllocator()), - v.getValueVector()).alloc(recordCapacity); + logger.debug("Reallocating vv to capacity " + recordCapacity + " after flush. " + v.getValueVector()); + getAllocator(v.getValueVector(), + TypeHelper.getNewVector(v.getField(), context.getAllocator())).alloc(recordCapacity); } + if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); } } + /** * Create a new output schema and allocate space for value vectors based on the incoming record batch. */ public void initializeBatch() { + isLast = false; recordCapacity = incoming.getRecordCount(); - valueVectors = Lists.newArrayList(); vectorContainer = new VectorContainer(); SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); @@ -122,12 +119,13 @@ public class OutgoingRecordBatch implements RecordBatch { bldr.addField(v.getField()); // allocate a new value vector - vectorContainer.add(v.getValueVector()); ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator()); - getAllocator(outgoingVector, v.getValueVector()).alloc(recordCapacity); - valueVectors.add(outgoingVector); + getAllocator(v.getValueVector(), outgoingVector).alloc(recordCapacity); + vectorContainer.add(outgoingVector); + logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector()); } outSchema = bldr.build(); + logger.debug("Initialized OutgoingRecordBatch. RecordCount: " + recordCount + ", cap: " + recordCapacity + " Schema: " + outSchema); } /** @@ -135,13 +133,11 @@ public class OutgoingRecordBatch implements RecordBatch { * on the incoming record batch. */ public void resetBatch() { + isLast = false; recordCount = 0; recordCapacity = 0; - if (valueVectors != null) { - for(ValueVector v : valueVectors){ - v.close(); - } - } + for (VectorWrapper v : vectorContainer) + v.getValueVector().clear(); initializeBatch(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ffc674b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 313ddf3..aa25c96 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.physical.impl.partitionsender; -import com.beust.jcommander.internal.Lists; import com.sun.codemodel.*; import org.apache.drill.common.expression.*; import org.apache.drill.exec.exception.ClassTransformationException; @@ -32,9 +31,10 @@ import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.TypeHelper; +import org.apache.drill.exec.vector.ValueVector; import java.io.IOException; -import java.util.List; class PartitionSenderRootExec implements RootExec { @@ -56,10 +56,10 @@ class PartitionSenderRootExec implements RootExec { this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()]; int fieldId = 0; for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations()) - outgoing[fieldId] = new OutgoingRecordBatch(operator, - context.getCommunicator().getTunnel(endpoint), - incoming, - context); + outgoing[fieldId++] = new OutgoingRecordBatch(operator, + context.getCommunicator().getTunnel(endpoint), + incoming, + context); try { createPartitioner(); } catch (SchemaChangeException e) { @@ -87,19 +87,25 @@ class PartitionSenderRootExec implements RootExec { partitioner.partitionBatch(incoming); // send all pending batches - flushOutgoingBatches(true, false); + try { + flushOutgoingBatches(true, false); + } catch (SchemaChangeException e) { + incoming.kill(); + logger.error("Error while creating partitioning sender or flushing outgoing batches", e); + context.fail(e); + return false; + } return false; case OK_NEW_SCHEMA: - // send all existing batches - flushOutgoingBatches(false, true); - // update OutgoingRecordBatch's schema and value vectors try { + // send all existing batches + flushOutgoingBatches(false, true); + // update OutgoingRecordBatch's schema and generate partitioning code createPartitioner(); - partitioner.setup(context, incoming, outgoing); } catch (SchemaChangeException e) { incoming.kill(); - logger.error("Failed to create partitioning sender during query ", e); + logger.error("Error while creating partitioning sender or flushing outgoing batches", e); context.fail(e); return false; } @@ -133,54 +139,85 @@ class PartitionSenderRootExec implements RootExec { } // generate code to copy from an incoming value vector to the destination partition's outgoing value vector - int fieldId = 0; JExpression inIndex = JExpr.direct("inIndex"); JExpression outIndex = JExpr.direct("outIndex"); + JType outgoingVectorArrayType = cg.getModel().ref(ValueVector.class).array().array(); + JType outgoingBatchArrayType = cg.getModel().ref(OutgoingRecordBatch.class).array(); cg.rotateBlock(); - // declare array of record batches for each partition + // declare and assign the array of outgoing record batches JVar outgoingBatches = cg.clazz.field(JMod.NONE, - cg.getModel().ref(OutgoingRecordBatch.class).array(), + outgoingBatchArrayType, "outgoingBatches"); - cg.getSetupBlock().assign(outgoingBatches, JExpr.direct("outgoing")); - // declare incoming value vectors - List<JVar> incomingVVs = Lists.newArrayList(); - for (VectorWrapper<?> vvIn : incoming) - incomingVVs.add(cg.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vvIn.getField().getType(), - fieldId++, - vvIn.isHyper()))); + // declare a two-dimensional array of value vectors; batch is first dimension, ValueVector is the second + JVar outgoingVectors = cg.clazz.field(JMod.NONE, + outgoingVectorArrayType, + "outgoingVectors"); + // create 2d array and build initialization list. For example: + // outgoingVectors = new ValueVector[][] { + // new ValueVector[] {vv1, vv2}, + // new ValueVector[] {vv3, vv4} + // }); + JArray outgoingVectorInit = JExpr.newArray(cg.getModel().ref(ValueVector.class).array()); + + int fieldId = 0; int batchId = 0; - fieldId = 0; - // generate switch statement for each destination batch - JSwitch switchStatement = cg.getBlock()._switch(outIndex); for (OutgoingRecordBatch batch : outgoing) { - // generate case statement for this batch - JBlock caseBlock = switchStatement._case(JExpr.lit(batchId)).body(); - + JArray outgoingVectorInitBatch = JExpr.newArray(cg.getModel().ref(ValueVector.class)); for (VectorWrapper<?> vv : batch) { - // declare outgoing value vector and a corresponding counter + // declare outgoing value vector and assign it to the array JVar outVV = cg.declareVectorValueSetupAndMember("outgoing[" + batchId + "]", new TypedFieldId(vv.getField().getType(), fieldId, false)); - - caseBlock.add(outVV.invoke("copyFrom") - .arg(inIndex) - .arg(JExpr.direct("outgoingBatches[" + batchId + "]").invoke("getRecordCount")) - .arg(incomingVVs.get(fieldId))); + // add vv to initialization list (e.g. { vv1, vv2, vv3 } ) + outgoingVectorInitBatch.add(outVV); ++fieldId; } - caseBlock.add(JExpr.direct("outgoingBatches[" + batchId + "]").invoke("incRecordCount")); - caseBlock.add(JExpr.direct("outgoingBatches[" + batchId + "]").invoke("flushIfNecessary")); - fieldId = 0; - caseBlock._break(); + + // add VV array to initialization list (e.g. new ValueVector[] { ... }) + outgoingVectorInit.add(outgoingVectorInitBatch); ++batchId; + fieldId = 0; } + // generate outgoing value vector 2d array initialization list. + cg.getSetupBlock().assign(outgoingVectors, outgoingVectorInit); + + for (VectorWrapper<?> vvIn : incoming) { + // declare incoming value vectors + JVar incomingVV = cg.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vvIn.getField().getType(), + fieldId, + vvIn.isHyper())); + + // generate the copyFrom() invocation with explicit cast to the appropriate type + Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(), + vvIn.getField().getType().getMode()); + JClass vvClass = cg.getModel().ref(vvType); + // the following block generates calls to copyFrom(); e.g.: + // ((IntVector) outgoingVectors[outIndex][0]).copyFrom(inIndex, + // outgoingBatches[outIndex].getRecordCount(), + // vv1); + cg.getBlock().add( + ((JExpression) JExpr.cast(vvClass, + ((JExpression) + outgoingVectors + .component(outIndex)) + .component(JExpr.lit(fieldId)))) + .invoke("copyFrom") + .arg(inIndex) + .arg(((JExpression) outgoingBatches.component(outIndex)).invoke("getRecordCount")) + .arg(incomingVV)); + + // generate the OutgoingRecordBatch helper invocations + cg.getBlock().add(((JExpression) outgoingBatches.component(outIndex)).invoke("incRecordCount")); + cg.getBlock().add(((JExpression) outgoingBatches.component(outIndex)).invoke("flushIfNecessary")); + ++fieldId; + } try { // compile and setup generated code partitioner = context.getImplementationClassMultipleOutput(cg); @@ -199,7 +236,7 @@ class PartitionSenderRootExec implements RootExec { * @param isLastBatch true if this is the last incoming batch * @param schemaChanged true if the schema has changed */ - public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) { + public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws SchemaChangeException { for (OutgoingRecordBatch batch : outgoing) { logger.debug("Attempting to flush all outgoing batches"); if (isLastBatch) http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8ffc674b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java index 4ecbd0e..48d7f5d 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java @@ -20,17 +20,20 @@ package org.apache.drill.exec.work; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.FragmentStatus; +import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.Builder; +import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState; import org.apache.drill.exec.rpc.bit.BitTunnel; +import org.apache.drill.exec.work.foreman.ErrorHelper; /** * Informs remote node as fragment changes state. */ -public class RemoteFragmentRunnerListener extends AbstractFragmentRunnerListener{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteFragmentRunnerListener.class); +public class RemotingFragmentRunnerListener extends AbstractFragmentRunnerListener{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemotingFragmentRunnerListener.class); private final BitTunnel tunnel; - public RemoteFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) { + public RemotingFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) { super(context); this.tunnel = tunnel; } @@ -38,7 +41,7 @@ public class RemoteFragmentRunnerListener extends AbstractFragmentRunnerListener @Override protected void statusChange(FragmentHandle handle, FragmentStatus status) { - logger.debug("Sending remote status message. {}", status); + logger.debug("Sending status change message message to remote node: " + status); tunnel.sendFragmentStatus(status); }
