Updated Branches: refs/heads/master 0e830960f -> b07682084
DRILL-299: OutgoingRecordBatch trying to get RecordCount on incoming batch with outcome NOT_YET Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b0768208 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b0768208 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b0768208 Branch: refs/heads/master Commit: b07682084da96469e310028e67b365d005f99bdb Parents: 0e83096 Author: Ben Becker <[email protected]> Authored: Fri Nov 15 16:35:12 2013 -0800 Committer: Jacques Nadeau <[email protected]> Committed: Fri Nov 15 16:35:12 2013 -0800 ---------------------------------------------------------------------- .../partitionsender/OutgoingRecordBatch.java | 41 ++++---------------- .../physical/impl/TestHashToRandomExchange.java | 2 - 2 files changed, 8 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b0768208/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java index 2647ffc..081b4c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.SchemaBuilder; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; @@ -50,7 +51,7 @@ import com.google.common.base.Preconditions; * next() will never be called on this object. When a record batch is ready to send (e.g. nearing size * limit or schema change), call flush() to send the batch. */ -public class OutgoingRecordBatch implements RecordBatch { +public class OutgoingRecordBatch implements VectorAccessible { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OutgoingRecordBatch.class); private BitTunnel tunnel; @@ -64,6 +65,8 @@ public class OutgoingRecordBatch implements RecordBatch { private int recordCount; private int recordCapacity; private int oppositeMinorFragmentId; + private static int DEFAULT_ALLOC_SIZE = 20000; + private static int DEFAULT_VARIABLE_WIDTH_SIZE = 2048; public OutgoingRecordBatch(HashPartitionSender operator, BitTunnel tunnel, RecordBatch incoming, FragmentContext context, int oppositeMinorFragmentId) { this.incoming = incoming; @@ -71,7 +74,6 @@ public class OutgoingRecordBatch implements RecordBatch { this.operator = operator; this.tunnel = tunnel; this.oppositeMinorFragmentId = oppositeMinorFragmentId; - initializeBatch(); } public void flushIfNecessary() { @@ -137,8 +139,8 @@ public class OutgoingRecordBatch implements RecordBatch { // must remain valid. recordCount = 0; for (VectorWrapper<?> v : vectorContainer) { - logger.debug("Reallocating vv to capacity " + incoming.getRecordCount() + " after flush."); - VectorAllocator.getAllocator(v.getValueVector(), v.getValueVector()).alloc(incoming.getRecordCount()); + logger.debug("Reallocating vv to capacity " + DEFAULT_ALLOC_SIZE + " after flush."); + VectorAllocator.getAllocator(v.getValueVector(), DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_ALLOC_SIZE); } if (!ok) { throw new SchemaChangeException("Flush ended NOT OK!"); } return true; @@ -150,7 +152,7 @@ public class OutgoingRecordBatch implements RecordBatch { */ public void initializeBatch() { isLast = false; - recordCapacity = incoming.getRecordCount(); + recordCapacity = DEFAULT_ALLOC_SIZE; vectorContainer = new VectorContainer(); SchemaBuilder bldr = BatchSchema.newBuilder().setSelectionVectorMode(BatchSchema.SelectionVectorMode.NONE); @@ -186,17 +188,6 @@ public class OutgoingRecordBatch implements RecordBatch { } @Override - public IterOutcome next() { - assert false; - return IterOutcome.STOP; - } - - @Override - public FragmentContext getContext() { - return context; - } - - @Override public BatchSchema getSchema() { Preconditions.checkNotNull(outSchema); return outSchema; @@ -208,21 +199,6 @@ public class OutgoingRecordBatch implements RecordBatch { } @Override - public void kill() { - incoming.kill(); - } - - @Override - public SelectionVector2 getSelectionVector2() { - throw new UnsupportedOperationException(); - } - - @Override - public SelectionVector4 getSelectionVector4() { - throw new UnsupportedOperationException(); - } - - @Override public TypedFieldId getValueVectorId(SchemaPath path) { return vectorContainer.getValueVectorId(path); } @@ -237,9 +213,8 @@ public class OutgoingRecordBatch implements RecordBatch { return vectorContainer.iterator(); } - @Override public WritableBatch getWritableBatch() { - return WritableBatch.get(this); + return WritableBatch.getBatchNoHVWrap(recordCount, this, false); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b0768208/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java index 0cc09b4..bbe1c18 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestHashToRandomExchange.java @@ -38,8 +38,6 @@ import com.google.common.io.Files; public class TestHashToRandomExchange extends PopUnitTestBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestHashToRandomExchange.class); - //Todo reenable this test once fix for has partition assignments is included - @Ignore @Test public void twoBitTwoExchangeTwoEntryRun() throws Exception { RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
