Repository: incubator-drill Updated Branches: refs/heads/master c8a08c3e7 -> e1e5ea0ed
DRILL-854: PartitionSender generated code is too large Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/58cf129b Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/58cf129b Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/58cf129b Branch: refs/heads/master Commit: 58cf129bb801c40410b22f300150185fde857ce7 Parents: c8a08c3 Author: Steven Phillips <[email protected]> Authored: Wed May 28 22:12:42 2014 -0700 Committer: Steven Phillips <[email protected]> Committed: Wed May 28 22:19:47 2014 -0700 ---------------------------------------------------------------------- .../partitionsender/OutgoingRecordBatch.java | 262 ------------------ .../PartitionSenderRootExec.java | 252 +++++------------ .../impl/partitionsender/Partitioner.java | 18 +- .../partitionsender/PartitionerTemplate.java | 272 +++++++++++++++++-- .../impl/partitionsender/StatusHandler.java | 64 +++++ .../impl/svremover/RemovingRecordBatch.java | 46 +--- .../physical/impl/xsort/ExternalSortBatch.java | 3 +- .../org/apache/drill/exec/vector/CopyUtil.java | 65 +++++ 8 files changed, 469 insertions(+), 513 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java deleted file mode 100644 index c86da8c..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.partitionsender; - -import io.netty.buffer.ByteBuf; - -import java.util.Iterator; - -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.ops.OperatorStats; -import org.apache.drill.exec.physical.config.HashPartitionSender; -import org.apache.drill.exec.physical.impl.SendingAccountor; -import org.apache.drill.exec.proto.ExecProtos; -import org.apache.drill.exec.proto.GeneralRPCProtos; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.FragmentWritableBatch; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.SchemaBuilder; -import org.apache.drill.exec.record.TypedFieldId; -import org.apache.drill.exec.record.VectorAccessible; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; -import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.data.DataTunnel; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; -import org.apache.drill.exec.work.ErrorHelper; - -import com.google.common.base.Preconditions; - -/** - * OutgoingRecordBatch is a holder of value vectors which are to be sent to another host. Thus, - * next() will never be called on this object. When a record batch is ready to send (e.g. nearing size - * limit or schema change), call flush() to send the batch. - */ -public class OutgoingRecordBatch implements VectorAccessible { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutgoingRecordBatch.class); - - private final DataTunnel tunnel; - private final HashPartitionSender operator; - private final RecordBatch incoming; - private final FragmentContext context; - private final BufferAllocator allocator; - private final VectorContainer vectorContainer = new VectorContainer(); - private final SendingAccountor sendCount; - private final int oppositeMinorFragmentId; - - private boolean isLast = false; - private volatile boolean ok = true; - private BatchSchema outSchema; - private int recordCount; - private OperatorStats stats; - private static final int DEFAULT_RECORD_BATCH_SIZE = 20000; - private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200; - - public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel, RecordBatch incoming, - FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) { - this.incoming = incoming; - this.context = context; - this.allocator = allocator; - this.operator = operator; - this.tunnel = tunnel; - this.sendCount = sendCount; - this.stats = stats; - this.oppositeMinorFragmentId = oppositeMinorFragmentId; - } - - public void flushIfNecessary() { - try { - if (recordCount == DEFAULT_RECORD_BATCH_SIZE) { - flush(); - stats.addLongStat(PartitionSenderStats.BATCHES_SENT, 1l); - stats.addLongStat(PartitionSenderStats.RECORDS_SENT, recordCount); - } - } catch (SchemaChangeException e) { - incoming.kill(); - logger.error("Error flushing outgoing batches", e); - context.fail(e); - } - } - - public void incRecordCount() { - ++recordCount; - } - - /** - * Send the record batch to the target node, then reset the value vectors - * - * @return true if a flush was needed; otherwise false - * @throws SchemaChangeException - */ - public boolean flush() throws SchemaChangeException { - final ExecProtos.FragmentHandle handle = context.getHandle(); - - if (recordCount != 0) { - - for(VectorWrapper<?> w : vectorContainer){ - w.getValueVector().getMutator().setValueCount(recordCount); - } - - -// BatchPrinter.printBatch(vectorContainer); - - FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, - handle.getQueryId(), - handle.getMajorFragmentId(), - handle.getMinorFragmentId(), - operator.getOppositeMajorFragmentId(), - oppositeMinorFragmentId, - getWritableBatch()); - - tunnel.sendRecordBatch(statusHandler, writableBatch); - this.sendCount.increment(); - } else { - logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : "")); - if (isLast) { - // send final (empty) batch - FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, - handle.getQueryId(), - handle.getMajorFragmentId(), - handle.getMinorFragmentId(), - operator.getOppositeMajorFragmentId(), - oppositeMinorFragmentId, - getWritableBatch()); - tunnel.sendRecordBatch(statusHandler, writableBatch); - this.sendCount.increment(); - vectorContainer.clear(); - return true; - } - } - - // reset values and reallocate the buffer for each value vector based on the incoming batch. - // NOTE: the value vector is directly referenced by generated code; therefore references - // must remain valid. - recordCount = 0; - vectorContainer.zeroVectors(); - for (VectorWrapper<?> v : vectorContainer) { -// logger.debug("Reallocating vv to capacity " + DEFAULT_RECORD_BATCH_SIZE + " after flush."); - VectorAllocator.getAllocator(v.getValueVector(), DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE); - } - if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); } - return true; - } - - - /** - * Create a new output schema and allocate space for value vectors based on the incoming record batch. - */ - public void initializeBatch() { - isLast = false; - vectorContainer.clear(); - - SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); - for (VectorWrapper<?> v : incoming) { - - // add field to the output schema - bldr.addField(v.getField()); - - // allocate a new value vector - ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator); - VectorAllocator.getAllocator(outgoingVector, DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE); - vectorContainer.add(outgoingVector); -// logger.debug("Reallocating to cap " + DEFAULT_RECORD_BATCH_SIZE + " because of newly init'd vector : " + v.getValueVector()); - } - outSchema = bldr.build(); -// logger.debug("Initialized OutgoingRecordBatch. RecordCount: " + recordCount + ", cap: " + DEFAULT_RECORD_BATCH_SIZE + " Schema: " + outSchema); - } - - /** - * Free any existing value vectors, create new output schema, and allocate value vectors based - * on the incoming record batch. - */ - public void resetBatch() { - isLast = false; - recordCount = 0; - for (VectorWrapper<?> v : vectorContainer){ - v.getValueVector().clear(); - } - } - - public void setIsLast() { - isLast = true; - } - - @Override - public BatchSchema getSchema() { - Preconditions.checkNotNull(outSchema); - return outSchema; - } - - @Override - public int getRecordCount() { - return recordCount; - } - - @Override - public TypedFieldId getValueVectorId(SchemaPath path) { - return vectorContainer.getValueVectorId(path); - } - - @Override - public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) { - return vectorContainer.getValueAccessorById(clazz, fieldIds); - } - - @Override - public Iterator<VectorWrapper<?>> iterator() { - return vectorContainer.iterator(); - } - - public WritableBatch getWritableBatch() { - return WritableBatch.getBatchNoHVWrap(recordCount, this, false); - } - - - private StatusHandler statusHandler = new StatusHandler(); - private class StatusHandler extends BaseRpcOutcomeListener<GeneralRPCProtos.Ack> { - RpcException ex; - - @Override - public void success(Ack value, ByteBuf buffer) { - sendCount.decrement(); - super.success(value, buffer); - } - - @Override - public void failed(RpcException ex) { - sendCount.decrement(); - logger.error("Failure while sending data to user.", ex); - ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger); - ok = false; - this.ex = ex; - } - - } - - public void clear(){ - vectorContainer.clear(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index d0eaf9a..74a3c90 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -18,7 +18,7 @@ package org.apache.drill.exec.physical.impl.partitionsender; import java.io.IOException; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ExecutionException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; @@ -28,7 +28,6 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; -import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; @@ -36,34 +35,32 @@ import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; -import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression; -import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TypedFieldId; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.record.*; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.rpc.data.DataTunnel; -import com.sun.codemodel.JArray; -import com.sun.codemodel.JClass; import com.sun.codemodel.JExpr; import com.sun.codemodel.JExpression; -import com.sun.codemodel.JMod; import com.sun.codemodel.JType; -import com.sun.codemodel.JVar; +import org.apache.drill.exec.vector.CopyUtil; + public class PartitionSenderRootExec implements RootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class); private RecordBatch incoming; private HashPartitionSender operator; - private OutgoingRecordBatch[] outgoing; private Partitioner partitioner; private FragmentContext context; private OperatorContext oContext; private boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); private final OperatorStats stats; + private final int outGoingBatchCount; + private final HashPartitionSender popConfig; public PartitionSenderRootExec(FragmentContext context, @@ -75,18 +72,8 @@ public class PartitionSenderRootExec implements RootExec { this.context = context; this.oContext = new OperatorContext(operator, context); this.stats = oContext.getStats(); - this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()]; - int fieldId = 0; - for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations()) { - FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(operator.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build(); - outgoing[fieldId] = new OutgoingRecordBatch(stats, sendCount, operator, - context.getDataTunnel(endpoint, opposite), - incoming, - context, - oContext.getAllocator(), - fieldId); - fieldId++; - } + this.outGoingBatchCount = operator.getDestinations().size(); + this.popConfig = operator; } @Override @@ -105,11 +92,12 @@ public class PartitionSenderRootExec implements RootExec { case NONE: try { // send any pending batches - for (OutgoingRecordBatch batch : outgoing) { - batch.setIsLast(); - batch.flush(); + if(partitioner != null) { + partitioner.flushOutgoingBatches(true, false); + } else { + sendEmptyBatch(); } - } catch (SchemaChangeException e) { + } catch (IOException e) { incoming.kill(); logger.error("Error while creating partitioning sender or flushing outgoing batches", e); context.fail(e); @@ -117,8 +105,8 @@ public class PartitionSenderRootExec implements RootExec { return false; case STOP: - for (OutgoingRecordBatch batch : outgoing) { - batch.clear(); + if (partitioner != null) { + partitioner.clear(); } return false; @@ -127,22 +115,31 @@ public class PartitionSenderRootExec implements RootExec { try { // send all existing batches if (partitioner != null) { - flushOutgoingBatches(false, true); - } - for (OutgoingRecordBatch b : outgoing) { - b.initializeBatch(); + partitioner.flushOutgoingBatches(false, true); + partitioner.clear(); } - // update OutgoingRecordBatch's schema and generate partitioning code + // update DeprecatedOutgoingRecordBatch's schema and generate partitioning code createPartitioner(); + } catch (IOException e) { + incoming.kill(); + logger.error("Error while flushing outgoing batches", e); + context.fail(e); + return false; } catch (SchemaChangeException e) { incoming.kill(); - logger.error("Error while creating partitioning sender or flushing outgoing batches", e); + logger.error("Error while setting up partitioner", e); context.fail(e); return false; } case OK: stats.batchReceived(0, incoming.getRecordCount(), newSchema); - partitioner.partitionBatch(incoming); + try { + partitioner.partitionBatch(incoming); + } catch (IOException e) { + incoming.kill(); + context.fail(e); + return false; + } for (VectorWrapper v : incoming) { v.clear(); } @@ -153,31 +150,6 @@ public class PartitionSenderRootExec implements RootExec { } } - - - private void generatePartitionFunction() throws SchemaChangeException { - - LogicalExpression filterExpression = operator.getExpr(); - final ErrorCollector collector = new ErrorCollectorImpl(); - final ClassGenerator<Partitioner> cg = CodeGenerator.get(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry()).getRoot(); - - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(filterExpression, incoming, collector,context.getFunctionRegistry()); - if(collector.hasErrors()){ - throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); - } - - cg.addExpr(new ReturnValueExpression(expr)); - - try { - Partitioner p = context.getImplementationClass(cg); - p.setup(context, incoming, outgoing); - } catch (ClassTransformationException | IOException e) { - throw new SchemaChangeException("Failure while attempting to load generated class", e); - } - - - } - private void createPartitioner() throws SchemaChangeException { // set up partitioning function @@ -188,6 +160,7 @@ public class PartitionSenderRootExec implements RootExec { boolean hyper = false; cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, context.getFunctionRegistry()); + ClassGenerator<Partitioner> cgInner = cg.getInnerGenerator("OutgoingRecordBatch"); final LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, incoming, collector, context.getFunctionRegistry()); if (collector.hasErrors()) { @@ -197,153 +170,56 @@ public class PartitionSenderRootExec implements RootExec { } // generate code to copy from an incoming value vector to the destination partition's outgoing value vector - JExpression inIndex = JExpr.direct("inIndex"); JExpression bucket = JExpr.direct("bucket"); - JType outgoingVectorArrayType = cg.getModel().ref(ValueVector.class).array().array(); - JType outgoingBatchArrayType = cg.getModel().ref(OutgoingRecordBatch.class).array(); // generate evaluate expression to determine the hash ClassGenerator.HoldingContainer exprHolder = cg.addExpr(materializedExpr); - cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "bucket", exprHolder.getValue().mod(JExpr.lit(outgoing.length))); - cg.getEvalBlock().assign(JExpr.ref("bucket"), cg.getModel().ref(Math.class).staticInvoke("abs").arg(bucket)); - // declare and assign the array of outgoing record batches - JVar outgoingBatches = cg.clazz.field(JMod.NONE, - outgoingBatchArrayType, - "outgoingBatches"); - cg.getSetupBlock().assign(outgoingBatches, JExpr.direct("outgoing")); - - // declare a two-dimensional array of value vectors; batch is first dimension, ValueVector is the second - JVar outgoingVectors = cg.clazz.field(JMod.NONE, - outgoingVectorArrayType, - "outgoingVectors"); - - // create 2d array and build initialization list. For example: - // outgoingVectors = new ValueVector[][] { - // new ValueVector[] {vv1, vv2}, - // new ValueVector[] {vv3, vv4} - // }); - JArray outgoingVectorInit = JExpr.newArray(cg.getModel().ref(ValueVector.class).array()); - - int fieldId = 0; - int batchId = 0; - for (OutgoingRecordBatch batch : outgoing) { - - JArray outgoingVectorInitBatch = JExpr.newArray(cg.getModel().ref(ValueVector.class)); - for (VectorWrapper<?> vv : batch) { - // declare outgoing value vector and assign it to the array - JVar outVV = cg.declareVectorValueSetupAndMember("outgoing[" + batchId + "]", - new TypedFieldId(vv.getField().getType(), - false, - fieldId)); - // add vv to initialization list (e.g. { vv1, vv2, vv3 } ) - outgoingVectorInitBatch.add(outVV); - ++fieldId; - } - - // add VV array to initialization list (e.g. new ValueVector[] { ... }) - outgoingVectorInit.add(outgoingVectorInitBatch); - ++batchId; - fieldId = 0; - } - - // generate outgoing value vector 2d array initialization list. - cg.getSetupBlock().assign(outgoingVectors, outgoingVectorInit); + cg.getEvalBlock().decl(JType.parse(cg.getModel(), "int"), "bucket", exprHolder.getValue().mod(JExpr.lit(outGoingBatchCount))); + cg.getEvalBlock()._return(cg.getModel().ref(Math.class).staticInvoke("abs").arg(bucket)); - for (VectorWrapper<?> vvIn : incoming) { - // declare incoming value vectors - JVar incomingVV = cg.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vvIn.getField().getType(), - vvIn.isHyper(), - fieldId)); + CopyUtil.generateCopies(cgInner, incoming, incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE); - // generate the copyFrom() invocation with explicit cast to the appropriate type - Class<?> vvType = TypeHelper.getValueVectorClass(vvIn.getField().getType().getMinorType(), - vvIn.getField().getType().getMode()); - JClass vvClass = cg.getModel().ref(vvType); - - if (!hyper) { - // the following block generates calls to copyFrom(); e.g.: - // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex, - // outgoingBatches[bucket].getRecordCount(), - // vv1); - cg.getEvalBlock()._if( - ((JExpression) JExpr.cast(vvClass, - ((JExpression) - outgoingVectors - .component(bucket)) - .component(JExpr.lit(fieldId)))) - .invoke("copyFromSafe") - .arg(inIndex) - .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount")) - .arg(incomingVV).not()) - ._then() - .add(((JExpression) outgoingBatches.component(bucket)).invoke("flush")) - ._return(JExpr.lit(false)); - } else { - // the following block generates calls to copyFrom(); e.g.: - // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex, - // outgoingBatches[bucket].getRecordCount(), - // vv1[((inIndex)>>> 16)]); - cg.getEvalBlock()._if( - ((JExpression) JExpr.cast(vvClass, - ((JExpression) - outgoingVectors - .component(bucket)) - .component(JExpr.lit(fieldId)))) - .invoke("copyFromSafe") - .arg(inIndex) - .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount")) - .arg(incomingVV.component(inIndex.shrz(JExpr.lit(16)))).not()) - ._then() - .add(((JExpression) outgoingBatches.component(bucket)).invoke("flush")) - ._return(JExpr.lit(false)); - - } - ++fieldId; - } - // generate the OutgoingRecordBatch helper invocations - cg.getEvalBlock().add(((JExpression) outgoingBatches.component(bucket)).invoke("incRecordCount")); - cg.getEvalBlock().add(((JExpression) outgoingBatches.component(bucket)).invoke("flushIfNecessary")); - cg.getEvalBlock()._return(JExpr.lit(true)); try { // compile and setup generated code // partitioner = context.getImplementationClassMultipleOutput(cg); partitioner = context.getImplementationClass(cg); - partitioner.setup(context, incoming, outgoing); + partitioner.setup(context, incoming, popConfig, stats, sendCount, oContext); } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); } } - /** - * Flush each outgoing record batch, and optionally reset the state of each outgoing record - * batch (on schema change). Note that the schema is updated based on incoming at the time - * this function is invoked. - * - * @param isLastBatch true if this is the last incoming batch - * @param schemaChanged true if the schema has changed - */ - public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws SchemaChangeException { - for (OutgoingRecordBatch batch : outgoing) { - logger.debug("Attempting to flush all outgoing batches"); - if (isLastBatch) - batch.setIsLast(); - batch.flush(); - if (schemaChanged) { - batch.resetBatch(); - batch.initializeBatch(); - } - } - } - public void stop() { logger.debug("Partition sender stopping."); ok = false; - for(OutgoingRecordBatch b : outgoing){ - b.clear(); + if (partitioner != null) { + partitioner.clear(); } sendCount.waitForSendComplete(); oContext.close(); incoming.cleanup(); } + + public void sendEmptyBatch() { + FragmentHandle handle = context.getHandle(); + int fieldId = 0; + VectorContainer container = new VectorContainer(); + StatusHandler statusHandler = new StatusHandler(sendCount, context); + for (DrillbitEndpoint endpoint : popConfig.getDestinations()) { + FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build(); + DataTunnel tunnel = context.getDataTunnel(endpoint, opposite); + FragmentWritableBatch writableBatch = new FragmentWritableBatch(true, + handle.getQueryId(), + handle.getMajorFragmentId(), + handle.getMinorFragmentId(), + operator.getOppositeMajorFragmentId(), + fieldId, + WritableBatch.getBatchNoHVWrap(0, container, false)); + tunnel.sendRecordBatch(statusHandler, writableBatch); + this.sendCount.increment(); + fieldId++; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 7d3998b..8d6c19a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -20,15 +20,27 @@ package org.apache.drill.exec.physical.impl.partitionsender; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.config.HashPartitionSender; +import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.record.RecordBatch; +import java.io.IOException; + public interface Partitioner { public abstract void setup(FragmentContext context, - RecordBatch incoming, - OutgoingRecordBatch[] outgoing) throws SchemaChangeException; + RecordBatch incoming, + HashPartitionSender popConfig, + OperatorStats stats, + SendingAccountor sendingAccountor, + OperatorContext oContext) throws SchemaChangeException; - public abstract void partitionBatch(RecordBatch incoming); + public abstract void partitionBatch(RecordBatch incoming) throws IOException; + public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException; + public abstract void initialize(); + public abstract void clear(); public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index fe62b73..4a27262 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -19,20 +19,46 @@ package org.apache.drill.exec.physical.impl.partitionsender; import javax.inject.Named; -import org.apache.drill.exec.ExecConstants; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.compile.sig.RuntimeOverridden; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.config.HashPartitionSender; +import org.apache.drill.exec.physical.impl.SendingAccountor; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.record.*; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.util.BatchPrinter; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.allocator.VectorAllocator; +import org.apache.drill.exec.work.ErrorHelper; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; public abstract class PartitionerTemplate implements Partitioner { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerTemplate.class); private SelectionVector2 sv2; private SelectionVector4 sv4; + private RecordBatch incoming; + private List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList(); private static final String REWRITE_MSG = "Failed to write the record {} in available space. Attempting to rewrite."; private static final String RECORD_TOO_BIG_MSG = "Record {} is too big to fit into the allocated memory of ValueVector."; @@ -40,12 +66,26 @@ public abstract class PartitionerTemplate implements Partitioner { public PartitionerTemplate() throws SchemaChangeException { } - @Override public final void setup(FragmentContext context, RecordBatch incoming, - OutgoingRecordBatch[] outgoing) throws SchemaChangeException { + HashPartitionSender popConfig, + OperatorStats stats, + SendingAccountor sendingAccountor, + OperatorContext oContext) throws SchemaChangeException { + + this.incoming = incoming; + doSetup(context, incoming, null); + + int fieldId = 0; + for (DrillbitEndpoint endpoint : popConfig.getDestinations()) { + FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build(); + outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig, context.getDataTunnel(endpoint, opposite), context, oContext.getAllocator(), fieldId)); + fieldId++; + } - doSetup(context, incoming, outgoing); + for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) { + outgoingRecordBatch.initializeBatch(); + } SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode(); switch(svMode){ @@ -65,17 +105,41 @@ public abstract class PartitionerTemplate implements Partitioner { } } + /** + * Flush each outgoing record batch, and optionally reset the state of each outgoing record + * batch (on schema change). Note that the schema is updated based on incoming at the time + * this function is invoked. + * + * @param isLastBatch true if this is the last incoming batch + * @param schemaChanged true if the schema has changed + */ + public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException { + for (OutgoingRecordBatch batch : outgoingBatches) { + logger.debug("Attempting to flush all outgoing batches"); + if (isLastBatch) { + batch.setIsLast(); + } + batch.flush(); + if (schemaChanged) { + batch.resetBatch(); + batch.initializeBatch(); + } + } + } + @Override - public void partitionBatch(RecordBatch incoming) { + public void partitionBatch(RecordBatch incoming) throws IOException { SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode(); // Keeping the for loop inside the case to avoid case evaluation for each record. switch(svMode) { case NONE: for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { - if (!doEval(recordId)) { + OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(recordId)); + if (!outgoingBatch.copy(recordId)) { logger.trace(REWRITE_MSG, recordId); - if (!doEval(recordId)) { + outgoingBatch.flush(); + if (!outgoingBatch.copy(recordId)) { logger.debug(RECORD_TOO_BIG_MSG, recordId); } } @@ -85,9 +149,10 @@ public abstract class PartitionerTemplate implements Partitioner { case TWO_BYTE: for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { int svIndex = sv2.getIndex(recordId); - if (!doEval(svIndex)) { - logger.trace(REWRITE_MSG, recordId); - if (!doEval(svIndex)) { + OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex)); + if (!outgoingBatch.copy(svIndex)) { + logger.trace(REWRITE_MSG, svIndex); + if (!outgoingBatch.copy(svIndex)) { logger.debug(RECORD_TOO_BIG_MSG, recordId); } } @@ -97,9 +162,10 @@ public abstract class PartitionerTemplate implements Partitioner { case FOUR_BYTE: for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) { int svIndex = sv4.get(recordId); - if (!doEval(svIndex)) { - logger.trace(REWRITE_MSG, recordId); - if (!doEval(svIndex)) { + OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex)); + if (!outgoingBatch.copy(svIndex)) { + logger.trace(REWRITE_MSG, svIndex); + if (!outgoingBatch.copy(svIndex)) { logger.debug(RECORD_TOO_BIG_MSG, recordId); } } @@ -111,6 +177,180 @@ public abstract class PartitionerTemplate implements Partitioner { } } + @Override + public void clear() { + for (OutgoingRecordBatch outgoingRecordBatch : outgoingBatches) { + outgoingRecordBatch.clear(); + } + } + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; - public abstract boolean doEval(@Named("inIndex") int inIndex); + public abstract int doEval(@Named("inIndex") int inIndex); + + public class OutgoingRecordBatch implements VectorAccessible { + + private final DataTunnel tunnel; + private final HashPartitionSender operator; + private final FragmentContext context; + private final BufferAllocator allocator; + private final VectorContainer vectorContainer = new VectorContainer(); + private final SendingAccountor sendCount; + private final int oppositeMinorFragmentId; + + private boolean isLast = false; + private BatchSchema outSchema; + private int recordCount; + private OperatorStats stats; + private static final int DEFAULT_RECORD_BATCH_SIZE = 20000; + private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200; + + private StatusHandler statusHandler; + + public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel, + FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) { + this.context = context; + this.allocator = allocator; + this.operator = operator; + this.tunnel = tunnel; + this.sendCount = sendCount; + this.stats = stats; + this.oppositeMinorFragmentId = oppositeMinorFragmentId; + this.statusHandler = new StatusHandler(sendCount, context); + } + + protected boolean copy(int inIndex) throws IOException { + if (doEval(inIndex, recordCount)) { + recordCount++; + if (recordCount == DEFAULT_RECORD_BATCH_SIZE) { + flush(); + } + return true; + } + return false; + } + + @RuntimeOverridden + protected void doSetup(@Named("incoming") RecordBatch incoming, @Named("outgoing") VectorAccessible outgoing) {}; + + @RuntimeOverridden + protected boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { return false; }; + + public void flush() throws IOException { + final ExecProtos.FragmentHandle handle = context.getHandle(); + + if (recordCount != 0) { + + for(VectorWrapper<?> w : vectorContainer){ + w.getValueVector().getMutator().setValueCount(recordCount); + } + + FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, + handle.getQueryId(), + handle.getMajorFragmentId(), + handle.getMinorFragmentId(), + operator.getOppositeMajorFragmentId(), + oppositeMinorFragmentId, + getWritableBatch()); + + tunnel.sendRecordBatch(statusHandler, writableBatch); + this.sendCount.increment(); + } else { + logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : "")); + if (isLast) { + // send final (empty) batch + FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, + handle.getQueryId(), + handle.getMajorFragmentId(), + handle.getMinorFragmentId(), + operator.getOppositeMajorFragmentId(), + oppositeMinorFragmentId, + getWritableBatch()); + tunnel.sendRecordBatch(statusHandler, writableBatch); + this.sendCount.increment(); + vectorContainer.clear(); + return; + } + } + + // reset values and reallocate the buffer for each value vector based on the incoming batch. + // NOTE: the value vector is directly referenced by generated code; therefore references + // must remain valid. + recordCount = 0; + vectorContainer.zeroVectors(); + for (VectorWrapper<?> v : vectorContainer) { + VectorAllocator.getAllocator(v.getValueVector(), DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE); + } + if (!statusHandler.isOk()) { + throw new IOException(statusHandler.getException()); + } + } + + public void initializeBatch() { + isLast = false; + vectorContainer.clear(); + + SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); + for (VectorWrapper<?> v : incoming) { + + // add field to the output schema + bldr.addField(v.getField()); + + // allocate a new value vector + ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator); + VectorAllocator.getAllocator(outgoingVector, DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE); + vectorContainer.add(outgoingVector); + } + outSchema = bldr.build(); + doSetup(incoming, vectorContainer); + } + + public void resetBatch() { + isLast = false; + recordCount = 0; + for (VectorWrapper<?> v : vectorContainer){ + v.getValueVector().clear(); + } + } + + public void setIsLast() { + isLast = true; + } + + @Override + public BatchSchema getSchema() { + Preconditions.checkNotNull(outSchema); + return outSchema; + } + + @Override + public int getRecordCount() { + return recordCount; + } + + @Override + public TypedFieldId getValueVectorId(SchemaPath path) { + return vectorContainer.getValueVectorId(path); + } + + @Override + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... fieldIds) { + return vectorContainer.getValueAccessorById(clazz, fieldIds); + } + + @Override + public Iterator<VectorWrapper<?>> iterator() { + return vectorContainer.iterator(); + } + + public WritableBatch getWritableBatch() { + return WritableBatch.getBatchNoHVWrap(recordCount, this, false); + } + + + + + public void clear(){ + vectorContainer.clear(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java new file mode 100644 index 0000000..e3f9eae --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/StatusHandler.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.partitionsender; + + +import io.netty.buffer.ByteBuf; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.SendingAccountor; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.work.ErrorHelper; + +public class StatusHandler extends BaseRpcOutcomeListener<Ack> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatusHandler.class); + RpcException ex; + SendingAccountor sendCount; + FragmentContext context; + boolean ok = true; + + public StatusHandler(SendingAccountor sendCount, FragmentContext context) { + this.sendCount = sendCount; + this.context = context; + } + + @Override + public void success(Ack value, ByteBuf buffer) { + sendCount.decrement(); + super.success(value, buffer); + } + + @Override + public void failed(RpcException ex) { + sendCount.decrement(); + logger.error("Failure while sending data to user.", ex); + ErrorHelper.logAndConvertError(context.getIdentity(), "Failure while sending fragment to client.", ex, logger); + ok = false; + this.ex = ex; + } + + public boolean isOk() { + return ok; + } + + public RpcException getException() { + return ex; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 41b71d0..3f2e060 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -31,10 +31,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.record.*; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.FixedWidthVector; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VariableWidthVector; +import org.apache.drill.exec.vector.*; import org.apache.drill.exec.vector.allocator.FixedVectorAllocator; import org.apache.drill.exec.vector.allocator.VariableEstimatedVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; @@ -210,7 +207,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect try { final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry()); - generateCopies(cg.getRoot(), incoming, false); + CopyUtil.generateCopies(cg.getRoot(), incoming, false); Copier copier = context.getImplementationClass(cg); copier.setupRemover(context, incoming, this, null); @@ -235,7 +232,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect try { final CodeGenerator<Copier> cg = CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry()); - generateCopies(cg.getRoot(), batch, true); + CopyUtil.generateCopies(cg.getRoot(), batch, true); Copier copier = context.getImplementationClass(cg); copier.setupRemover(context, batch, outgoing, null); @@ -245,43 +242,6 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } } - public static void generateCopies(ClassGenerator g, VectorAccessible batch, boolean hyper){ - // we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all. - int fieldId = 0; - - JExpression inIndex = JExpr.direct("inIndex"); - JExpression outIndex = JExpr.direct("outIndex"); - g.rotateBlock(); - for(VectorWrapper<?> vv : batch){ - JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), vv.isHyper(), fieldId)); - JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId)); - - if(hyper){ - - g.getEvalBlock()._if( - outVV - .invoke("copyFromSafe") - .arg( - inIndex.band(JExpr.lit((int) Character.MAX_VALUE))) - .arg(outIndex) - .arg( - inVV.component(inIndex.shrz(JExpr.lit(16))) - ) - .not() - ) - ._then()._return(JExpr.FALSE); - }else{ - g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE); - } - - - fieldId++; - } - g.rotateBlock(); - g.getEvalBlock()._return(JExpr.TRUE); - } - - @Override public WritableBatch getWritableBatch() { return WritableBatch.get(this); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 2289680..d6cbbc4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -49,6 +49,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.util.Utilities; +import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.apache.hadoop.conf.Configuration; @@ -508,7 +509,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { generateComparisons(g, batch); g.setMappingSet(COPIER_MAPPING_SET); - RemovingRecordBatch.generateCopies(g, batch, true); + CopyUtil.generateCopies(g, batch, true); g.setMappingSet(MAIN_MAPPING); copier = context.getImplementationClass(cg); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/58cf129b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java new file mode 100644 index 0000000..1f09792 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/CopyUtil.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector; + +import com.sun.codemodel.JExpr; +import com.sun.codemodel.JExpression; +import com.sun.codemodel.JVar; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorAccessible; +import org.apache.drill.exec.record.VectorWrapper; + +public class CopyUtil { + public static void generateCopies(ClassGenerator g, VectorAccessible batch, boolean hyper){ + // we have parallel ids for each value vector so we don't actually have to deal with managing the ids at all. + int fieldId = 0; + + JExpression inIndex = JExpr.direct("inIndex"); + JExpression outIndex = JExpr.direct("outIndex"); + g.rotateBlock(); + for(VectorWrapper<?> vv : batch){ + JVar inVV = g.declareVectorValueSetupAndMember("incoming", new TypedFieldId(vv.getField().getType(), vv.isHyper(), fieldId)); + JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), false, fieldId)); + + if(hyper){ + + g.getEvalBlock()._if( + outVV + .invoke("copyFromSafe") + .arg( + inIndex.band(JExpr.lit((int) Character.MAX_VALUE))) + .arg(outIndex) + .arg( + inVV.component(inIndex.shrz(JExpr.lit(16))) + ) + .not() + ) + ._then()._return(JExpr.FALSE); + }else{ + g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE); + } + + + fieldId++; + } + g.rotateBlock(); + g.getEvalBlock()._return(JExpr.TRUE); + } + +}
