Fix an issue with keys output for multiple output batches of hash aggregate.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e7115e91 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e7115e91 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e7115e91 Branch: refs/heads/master Commit: e7115e91875b5453ca24e7e5995c5eba5f452e29 Parents: 5baa4ad Author: Aman Sinha <asi...@maprtech.com> Authored: Sat Jun 7 19:41:54 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Sat Jun 7 19:43:10 2014 -0700 ---------------------------------------------------------------------- .../impl/aggregate/HashAggTemplate.java | 52 +++++--------------- .../physical/impl/common/HashTableTemplate.java | 21 +++----- 2 files changed, 19 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e7115e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index b65acb0..2fb3f02 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -74,6 +74,7 @@ public abstract class HashAggTemplate implements HashAggregator { private int outputCount = 0; private int numGroupedRecords = 0; private int outBatchIndex = 0; + private int lastBatchOutputCount = 0; private RecordBatch incoming; private BatchSchema schema; private RecordBatch outgoing; @@ -97,8 +98,8 @@ public abstract class HashAggTemplate implements HashAggregator { public class BatchHolder { private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables) - int maxOccupiedIdx = -1; - int batchOutputCount = 0; + private int maxOccupiedIdx = -1; + private int batchOutputCount = 0; private BatchHolder() { @@ -130,7 +131,7 @@ public abstract class HashAggTemplate implements HashAggregator { private boolean outputValues() { for (int i = 0; i <= maxOccupiedIdx; i++) { if (outputRecordValues(i, batchOutputCount) ) { - if (EXTRA_DEBUG_2) logger.debug("Outputting values to batch index: {} output index: {}", batchOutputCount) ; + if (EXTRA_DEBUG_2) logger.debug("Outputting values to output index: {}", batchOutputCount) ; batchOutputCount++; } else { return false; @@ -275,18 +276,10 @@ public abstract class HashAggTemplate implements HashAggregator { buildComplete = true; - // outputKeysAndValues() ; - // output the first batch; remaining batches will be output // in response to each next() call by a downstream operator - // outputKeysAndValues(outBatchIndex); outputCurrentBatch(); - - //if (isLastBatchOutput()) { - // cleanup my internal state since there is nothing more to return - // this.cleanup(); - // } // cleanup incoming batch since output of aggregation does not need // any references to the incoming @@ -332,8 +325,8 @@ public abstract class HashAggTemplate implements HashAggregator { @Override public int getOutputCount() { - return outputCount; - // return batchHolders.get(outBatchIndex).getOutputCount(); + // return outputCount; + return lastBatchOutputCount; } @Override @@ -394,29 +387,6 @@ public abstract class HashAggTemplate implements HashAggregator { bh.setup(); } - - /* - private boolean outputKeysAndValues() { - - allocateOutgoing(); - - int batchIdx = 0; - for (BatchHolder bh : batchHolders) { - if (! this.htable.outputKeys(batchIdx++)) { - return false; - } - } - - for (BatchHolder bh : batchHolders) { - if (! bh.outputValues() ) { - return false; - } - } - - allFlushed = true ; - return true; - } -*/ // output the keys and values for a particular batch holder private boolean outputKeysAndValues(int batchIdx) { @@ -455,8 +425,9 @@ public abstract class HashAggTemplate implements HashAggregator { allocateOutgoing(batchOutputRecords); - if (this.htable.outputKeys(outBatchIndex) - && batchHolders.get(outBatchIndex).outputValues()) { + boolean outputKeysStatus = this.htable.outputKeys(outBatchIndex) ; + boolean outputValuesStatus = batchHolders.get(outBatchIndex).outputValues(); + if (outputKeysStatus && outputValuesStatus) { // set the value count for outgoing batch value vectors for(VectorWrapper<?> v : outgoing) { @@ -472,7 +443,8 @@ public abstract class HashAggTemplate implements HashAggregator { } logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, batchOutputRecords); - + + lastBatchOutputCount = batchOutputRecords; outBatchIndex++; if (outBatchIndex == batchHolders.size()) { allFlushed = true; @@ -483,6 +455,8 @@ public abstract class HashAggTemplate implements HashAggregator { this.cleanup(); } } else { + if (!outputKeysStatus) context.fail(new Exception("Failed to output keys for current batch !")); + if (!outputValuesStatus) context.fail(new Exception("Failed to output values for current batch !")); this.outcome = IterOutcome.STOP; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e7115e91/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index f2844ac..a8af5ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -21,13 +21,8 @@ import java.util.ArrayList; import javax.inject.Named; -import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.FieldReference; -import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.types.Types; -import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -41,9 +36,6 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; -import org.apache.drill.exec.vector.BigIntVector; -import org.apache.drill.exec.expr.holders.BigIntHolder; -import org.apache.drill.exec.vector.allocator.FixedVectorAllocator; public abstract class HashTableTemplate implements HashTable { @@ -96,8 +88,6 @@ public abstract class HashTableTemplate implements HashTable { private MaterializedField dummyIntField; - private int outputCount = 0; - // This class encapsulates the links, keys and values for up to BATCH_SIZE // *unique* records. Thus, suppose there are N incoming record batches, each // of size BATCH_SIZE..but they have M unique keys altogether, the number of @@ -113,7 +103,8 @@ public abstract class HashTableTemplate implements HashTable { // Array of hash values - this is useful when resizing the hash table private IntVector hashValues; - int maxOccupiedIdx = -1; + private int maxOccupiedIdx = -1; + private int batchOutputCount = 0; private BatchHolder(int idx) { @@ -269,16 +260,16 @@ public abstract class HashTableTemplate implements HashTable { */ for (int i = 0; i <= maxOccupiedIdx; i++) { - if (outputRecordKeys(i, outputCount) ) { - if (EXTRA_DEBUG) logger.debug("Outputting keys to {}", outputCount) ; + if (outputRecordKeys(i, batchOutputCount) ) { + if (EXTRA_DEBUG) logger.debug("Outputting keys to output index: {}", batchOutputCount) ; // debugging // holder.value = vv0.getAccessor().get(i); // if (holder.value == 100018 || holder.value == 100021) { - // logger.debug("Outputting key = {} at index - {} to outgoing index = {}.", holder.value, i, outputCount); + // logger.debug("Outputting key = {} at index - {} to outgoing index = {}.", holder.value, i, batchOutputCount); // } - outputCount++; + batchOutputCount++; } else { return false; }