Repository: drill Updated Branches: refs/heads/master c1b847acd -> 403dc5cdc
DRILL-3147: tpcds-sf1-parquet query 73 causes memory leak - each time a fragment A sends a "receiver finished" to fragment B, fragment B id will be added to FragmentContext.ignoredSenders list - refactored UnorderedReceiverBatch.informSenders() and MergingRecordBatch.informSenders() by moving this method to FragmentContext - DataServer.send() uses FragmentContext.ignoredSenders to decide if a batch should be passed to the fragment or discarded right away - BaseRawBatchBuffer methods enqueue() and kill() are now synchronized - TestTpcdsSf1Leak test reproduces the leak, it's ignored by default because it requires a large dataset Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/403dc5cd Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/403dc5cd Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/403dc5cd Branch: refs/heads/master Commit: 403dc5cdc6d34b24c1caca7cb574eb9e9727afe4 Parents: c1b847a Author: adeneche <[email protected]> Authored: Mon May 18 18:02:12 2015 -0700 Committer: Jason Altekruse <[email protected]> Committed: Thu Jun 25 12:01:00 2015 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/rpc/data/DataServer.java | 41 +++-------- .../exec/work/batch/BaseRawBatchBuffer.java | 11 ++- .../exec/work/fragment/RootFragmentManager.java | 2 +- .../java/org/apache/drill/BaseTestQuery.java | 14 ++-- .../drill/exec/server/TestTpcdsSf1Leaks.java | 75 ++++++++++++++++++++ .../src/test/resources/tpcds-sf1/q73.sql | 27 +++++++ 6 files changed, 129 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index 80d2d6e..4908c18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -122,7 +122,6 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { final FragmentRecordBatch fragmentBatch = get(pBody, FragmentRecordBatch.PARSER); final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount(); - Pointer<DrillBuf> out = new Pointer<DrillBuf>(); AckSender ack = new AckSender(sender); // increment so we don't get false returns. ack.increment(); @@ -139,12 +138,8 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } }else{ - if (targetCount > 1) { - for (int minor = 0; minor < targetCount; minor++) { - send(fragmentBatch, (DrillBuf) body, minor, ack, true); - } - } else { - send(fragmentBatch, (DrillBuf) body, 0, ack, false); + for (int minor = 0; minor < targetCount; minor++) { + send(fragmentBatch, (DrillBuf) body, minor, ack); } } } catch (IOException | FragmentSetupException e) { @@ -158,34 +153,23 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { // decrement the extra reference we grabbed at the top. ack.sendOk(); - if(out != null && out.value != null){ - out.value.release(); - } } } - private void send(final FragmentRecordBatch fragmentBatch, final DrillBuf body, final int minor, final AckSender ack, - final boolean shared) + private void send(final FragmentRecordBatch fragmentBatch, final DrillBuf body, final int minor, final AckSender ack) throws FragmentSetupException, IOException { - FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor)); + final FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor)); if (manager == null) { return; } final BufferAllocator allocator = manager.getFragmentContext().getAllocator(); - final Pointer<DrillBuf> out = new Pointer<DrillBuf>(); + final Pointer<DrillBuf> out = new Pointer<>(); final boolean withinMemoryEnvelope; - final DrillBuf submitBody; - - if (shared) { - withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body, out); - submitBody = out.value; - }else{ - withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body.unwrap()); - submitBody = body; - } + + withinMemoryEnvelope = allocator.takeOwnership(body, out); if (!withinMemoryEnvelope) { // if we over reserved, we need to add poison pill before batch. @@ -193,14 +177,11 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } ack.increment(); - dataHandler.handle(manager, fragmentBatch, submitBody, ack); - - if (shared) { - // make sure to release the reference count we have to the new buffer. - // dataHandler.handle should have taken any ownership it needed. - out.value.release(); - } + dataHandler.handle(manager, fragmentBatch, out.value, ack); + // make sure to release the reference count we have to the new buffer. + // dataHandler.handle should have taken any ownership it needed. + out.value.release(); } private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> { http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java index 11b6cc8..fbffd87 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java @@ -26,9 +26,9 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RawFragmentBatch; public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRawBatchBuffer.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRawBatchBuffer.class); - private static enum BufferState { + private enum BufferState { INIT, STREAMS_FINISHED, KILLED @@ -61,7 +61,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { } @Override - public void enqueue(final RawFragmentBatch batch) throws IOException { + public synchronized void enqueue(final RawFragmentBatch batch) throws IOException { // if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to // ensure that tests run. @@ -113,8 +113,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { public void close() { if (!isTerminated() && context.shouldContinue()) { final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount); - final IllegalStateException e = new IllegalStateException(msg); - throw e; + throw new IllegalStateException(msg); } if (!bufferQueue.isEmpty()) { @@ -127,7 +126,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { } @Override - public void kill(final FragmentContext context) { + public synchronized void kill(final FragmentContext context) { state = BufferState.KILLED; clearBufferWithBody(); } http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java index b770a33..f4f76dd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java @@ -30,7 +30,7 @@ import org.apache.drill.exec.work.batch.IncomingBuffers; // TODO a lot of this is the same as NonRootFragmentManager public class RootFragmentManager implements FragmentManager { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class); private final IncomingBuffers buffers; private final FragmentExecutor runner; http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index 3d09d6a..46186df 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.net.URL; -import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -47,11 +46,8 @@ import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.util.JsonStringArrayList; -import org.apache.drill.exec.util.JsonStringHashMap; import org.apache.drill.exec.util.TestUtilities; import org.apache.drill.exec.util.VectorUtil; -import org.apache.hadoop.io.Text; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.rules.TestRule; @@ -65,6 +61,7 @@ import com.google.common.io.Resources; import static org.hamcrest.core.StringContains.containsString; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; public class BaseTestQuery extends ExecTest { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class); @@ -393,6 +390,15 @@ public class BaseTestQuery extends ExecTest { return dir.getAbsolutePath() + File.separator + dirName; } + + protected static void setSessionOption(final String option, final String value) { + try { + runSQL(String.format("alter session set `%s` = %s", option, value)); + } catch(final Exception e) { + fail(String.format("Failed to set session option `%s` = %s, Error: %s", option, value, e.toString())); + } + } + private static class SilentListener implements UserResultsListener { private volatile UserException exception; private AtomicInteger count = new AtomicInteger(); http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java new file mode 100644 index 0000000..ba19e0d --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestTpcdsSf1Leaks.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.server; + +import org.apache.drill.BaseTestQuery; + +import static org.apache.drill.exec.ExecConstants.SLICE_TARGET; +import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT; +import static org.junit.Assert.fail; + +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; + +/** + * To run this unit class you need to download the following data file: + * http://apache-drill.s3.amazonaws.com/files/tpcds-sf1-parquet.tgz + * and untar it in a some folder (e.g. /tpcds-sf1-parquet) then add the following workspace to + * exec/java-exec/src/test/resources/bootstrap-storage-plugins.json + * + * ,"tpcds" : { + * location: "/tpcds-sf1-parquet", + * writable: false + * } + * + */ +@Ignore +public class TestTpcdsSf1Leaks extends BaseTestQuery { + + @Rule + final public TestRule TIMEOUT = new Timeout(0); // wait forever + + @BeforeClass + public static void initCluster() { + updateTestCluster(3, null); + } + + @Test + public void test() throws Exception { + setSessionOption(SLICE_TARGET, "10"); + try { + final String query = getFile("tpcds-sf1/q73.sql"); + for (int i = 0; i < 20; i++) { + System.out.printf("%nRun #%d%n", i+1); + + try { + runSQL(query); + } catch (final Exception e) { + fail("query failed: " + e.getMessage()); + } + } + }finally { + setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT)); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/403dc5cd/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql b/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql new file mode 100644 index 0000000..094ca2b --- /dev/null +++ b/exec/java-exec/src/test/resources/tpcds-sf1/q73.sql @@ -0,0 +1,27 @@ +select c.c_last_name, + c.c_first_name, + c.c_salutation, + c.c_preferred_cust_flag, + dj.sstn, + dj.cnt +from ( + select ss.ss_ticket_number as sstn, ss.ss_customer_sk as sscsk, count(*) cnt + from dfs_test.tpcds.store_sales as ss, + dfs_test.tpcds.date_dim as d, + dfs_test.tpcds.store as s, + dfs_test.tpcds.household_demographics as hd + where ss.ss_sold_date_sk = d.d_date_sk + and ss.ss_store_sk = s.s_store_sk + and ss.ss_hdemo_sk = hd.hd_demo_sk + and (hd.hd_buy_potential = '>10000' or hd.hd_buy_potential = 'unknown') + and hd.hd_vehicle_count > 0 + and case when hd.hd_vehicle_count > 0 then hd.hd_dep_count / hd.hd_vehicle_count else null end > 1 + and s.s_county in ('Saginaw County', 'Sumner County', 'Appanoose County', 'Daviess County') + and ss.ss_sold_date_sk between 2451180 and 2451269 + group by ss.ss_ticket_number, ss.ss_customer_sk +) dj, + dfs_test.tpcds.customer as c +where dj.sscsk = c.c_customer_sk + and dj.cnt between 1 and 5 +order by dj.cnt desc +limit 1000 \ No newline at end of file
