DRILL-1116: Flush batch when copy fails due to insufficient space
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/de00d6f5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/de00d6f5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/de00d6f5 Branch: refs/heads/master Commit: de00d6f52e98432280bf8d509fc1ecee79a365a8 Parents: 84dc2de Author: Steven Phillips <sphill...@maprtech.com> Authored: Tue Jul 8 00:16:56 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Tue Jul 8 03:20:37 2014 -0700 ---------------------------------------------------------------------- .../exec/physical/impl/partitionsender/PartitionerTemplate.java | 5 +++++ 1 file changed, 5 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/de00d6f5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 8b63c5c..0fe3f15 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -152,6 +152,7 @@ public abstract class PartitionerTemplate implements Partitioner { outgoingBatch.flush(); if (!outgoingBatch.copy(recordId)) { logger.debug(RECORD_TOO_BIG_MSG, recordId); + throw new IOException(RECORD_TOO_BIG_MSG); } } } @@ -163,8 +164,10 @@ public abstract class PartitionerTemplate implements Partitioner { OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex)); if (!outgoingBatch.copy(svIndex)) { logger.trace(REWRITE_MSG, svIndex); + outgoingBatch.flush(); if (!outgoingBatch.copy(svIndex)) { logger.debug(RECORD_TOO_BIG_MSG, recordId); + throw new IOException(RECORD_TOO_BIG_MSG); } } } @@ -176,8 +179,10 @@ public abstract class PartitionerTemplate implements Partitioner { OutgoingRecordBatch outgoingBatch = outgoingBatches.get(doEval(svIndex)); if (!outgoingBatch.copy(svIndex)) { logger.trace(REWRITE_MSG, svIndex); + outgoingBatch.flush(); if (!outgoingBatch.copy(svIndex)) { logger.debug(RECORD_TOO_BIG_MSG, recordId); + throw new IOException(RECORD_TOO_BIG_MSG); } } }