DRILL-640: Fix memory leak in limit operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f2ff2c9d Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f2ff2c9d Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f2ff2c9d Branch: refs/heads/master Commit: f2ff2c9d2aad429f042da8250ca6e1ef1f160318 Parents: d870b6e Author: Mehant Baid <meha...@gmail.com> Authored: Mon May 5 14:48:18 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Wed May 7 18:43:13 2014 -0700 ---------------------------------------------------------------------- .../physical/impl/limit/LimitRecordBatch.java | 12 +++ .../impl/limit/TestLimitWithExchanges.java | 29 +++++++ .../test/resources/limit/limit_exchanges.json | 87 ++++++++++++++++++++ 3 files changed, 128 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f2ff2c9d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 3f2ec27..ed56e79 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -81,6 +81,18 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { public IterOutcome next() { if(!noEndLimit && recordsLeft <= 0) { // don't kill incoming batches or call cleanup yet, as this could close allocators before the buffers have been cleared + // Drain the incoming record batch and clear the memory + IterOutcome upStream = incoming.next(); + + while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) { + + // Clear the memory for the incoming batch + for (VectorWrapper<?> wrapper : incoming) { + wrapper.getValueVector().clear(); + } + upStream = incoming.next(); + } + return IterOutcome.NONE; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f2ff2c9d/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java new file mode 100644 index 0000000..0e4d734 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.limit; + +import org.apache.drill.BaseTestQuery; +import org.junit.Test; + +public class TestLimitWithExchanges extends BaseTestQuery { + + @Test + public void testLimitWithExchanges() throws Exception{ + testPhysicalFromFile("limit/limit_exchanges.json"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f2ff2c9d/exec/java-exec/src/test/resources/limit/limit_exchanges.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/limit/limit_exchanges.json b/exec/java-exec/src/test/resources/limit/limit_exchanges.json new file mode 100644 index 0000000..5ad56be --- /dev/null +++ b/exec/java-exec/src/test/resources/limit/limit_exchanges.json @@ -0,0 +1,87 @@ +{ + "head" : { + "version" : 1, + "generator" : { + "type" : "DefaultSqlHandler", + "info" : "" + }, + "type" : "APACHE_DRILL_PHYSICAL", + "resultMode" : "EXEC" + }, + "graph" : [ { + "pop" : "parquet-scan", + "@id" : 1, + "entries" : [ { + "path" : "/tpch/nation.parquet" + } ], + "storage" : { + "type" : "file", + "connection" : "classpath:///", + "workspaces" : null, + "formats" : { + "psv" : { + "type" : "text", + "extensions" : [ "tbl" ], + "delimiter" : "|" + }, + "csv" : { + "type" : "text", + "extensions" : [ "csv" ], + "delimiter" : "," + }, + "tsv" : { + "type" : "text", + "extensions" : [ "tsv" ], + "delimiter" : "\t" + }, + "parquet" : { + "type" : "parquet" + }, + "json" : { + "type" : "json" + } + } + }, + "format" : { + "type" : "parquet" + }, + "selectionRoot" : "/Users/mbaid/sources/drill/tpch-work/sample-data/nationsMF" + }, { + "pop" : "project", + "@id" : 2, + "exprs" : [ { + "ref" : "`N_NATIONKEY`", + "expr" : "`N_NATIONKEY`" + } ], + "child" : 1 + }, { + "pop" : "hash-to-random-exchange", + "@id" : 3, + "child" : 2, + "expr" : "hash(`N_NATIONKEY`) " + }, { + "pop" : "union-exchange", + "@id" : 4, + "child" : 3 + }, +{ + "pop" : "project", + "@id" : 5, + "exprs" : [ { + "ref" : "`N_NATIONKEY`", + "expr" : "`N_NATIONKEY`" + } ], + "child" : 4 + }, +{ + "pop" : "limit", + "@id" : 6, + "child" : 5, + "first" : 0, + "last" : 1 + }, { + "pop" : "screen", + "@id" : 7, + "child" : 6 + } ] +}