DRILL-3061: Fix memory leaks in TestDrillbitResilience - fixes a race condition in WorkEventBus - marking TestDrillbitResilience with @Ignore
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bef60f58 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bef60f58 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bef60f58 Branch: refs/heads/master Commit: bef60f58a79b6bb41c06d59897577d7f7017c526 Parents: c5f1c83 Author: adeneche <[email protected]> Authored: Tue May 12 10:53:32 2015 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu May 14 21:58:43 2015 -0700 ---------------------------------------------------------------------- .../drill/exec/rpc/control/WorkEventBus.java | 16 +- .../apache/drill/exec/rpc/data/DataServer.java | 2 +- .../org/apache/drill/exec/ZookeeperHelper.java | 14 +- .../exec/server/TestDrillbitResilience.java | 190 ++++++++++++++++--- 4 files changed, 190 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java index ddd7828..3e461ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java @@ -90,16 +90,16 @@ public class WorkEventBus { } public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException { - // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message. - if (recentlyFinishedFragments.asMap().containsKey(handle)) { - if (logger.isDebugEnabled()) { - logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle); + synchronized (this) { + // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message. + if (recentlyFinishedFragments.asMap().containsKey(handle)) { + if (logger.isDebugEnabled()) { + logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle); + } + return null; } - return null; - } - // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable. - synchronized (this) { + // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable. final FragmentManager m = managers.get(handle); if (m != null) { return m; http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index 0d4077e..061ddcb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -150,7 +150,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { logger.error("Failure while getting fragment manager. {}", QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(), fragmentBatch.getReceivingMajorFragmentId(), - fragmentBatch.getReceivingMinorFragmentIdList())); + fragmentBatch.getReceivingMinorFragmentIdList()), e); ack.clear(); sender.send(new Response(RpcType.ACK, Acks.FAIL)); } finally { http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java index a5db81d..630c81b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java @@ -46,10 +46,22 @@ public class ZookeeperHelper { * <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist. */ public ZookeeperHelper() { + this(false); + } + + /** + * Constructor. + * + * <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist. + * @param failureInCancelled pass true if you want failures in cancelled fragments to be reported as failures + */ + public ZookeeperHelper(boolean failureInCancelled) { final Properties overrideProps = new Properties(); // Forced to disable this, because currently we leak memory which is a known issue for query cancellations. // Setting this causes unittests to fail. - // overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, "true"); + if (failureInCancelled) { + overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, "true"); + } config = DrillConfig.create(overrideProps); zkUrl = config.getString(ExecConstants.ZK_CONNECTION); http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java index 8552ec1..696aed8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java @@ -30,8 +30,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import com.google.common.base.Preconditions; import org.apache.commons.math3.util.Pair; +import org.apache.drill.BaseTestQuery; import org.apache.drill.QueryTestUtil; import org.apache.drill.SingleRowListener; import org.apache.drill.common.AutoCloseables; @@ -48,11 +48,9 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.physical.impl.ScreenCreator; import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec; -import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch; import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec; import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator; -import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch; import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch; import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch; import org.apache.drill.exec.planner.sql.DrillSqlWorker; @@ -88,11 +86,14 @@ import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; +import com.google.common.base.Preconditions; + /** * Test how resilient drillbits are to throwing exceptions during various phases of query * execution by injecting exceptions at various points and to cancellations in various phases. * The test cases are mentioned in DRILL-2383. */ +@Ignore public class TestDrillbitResilience extends DrillTest { private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class); @@ -164,7 +165,7 @@ public class TestDrillbitResilience extends DrillTest { // turn off the HTTP server to avoid port conflicts between the drill bits System.setProperty(ExecConstants.HTTP_ENABLE, "false"); - zkHelper = new ZookeeperHelper(); + zkHelper = new ZookeeperHelper(true); zkHelper.startZookeeper(1); // use a non-null service set so that the drillbits can use port hunting @@ -269,7 +270,6 @@ public class TestDrillbitResilience extends DrillTest { assertTrue("There should not be any errors when checking if Drillbits are OK.", errorList.isEmpty()); } - @SuppressWarnings("static-method") @After public void checkDrillbits() { clearAllInjections(); // so that the drillbit check itself doesn't trigger anything @@ -355,6 +355,8 @@ public class TestDrillbitResilience extends DrillTest { @Test public void settingNoopInjectionsAndQuery() { + final long before = countAllocatedMemory(); + final String controls = createSingleExceptionOnBit(getClass(), "noop", RuntimeException.class, DRILLBIT_BETA); setControls(controls); try { @@ -362,6 +364,9 @@ public class TestDrillbitResilience extends DrillTest { } catch (final Exception e) { fail(e.getMessage()); } + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } /** @@ -381,16 +386,24 @@ public class TestDrillbitResilience extends DrillTest { } } - @SuppressWarnings("static-method") @Test public void foreman_runTryBeginning() { + final long before = countAllocatedMemory(); + testForeman("run-try-beginning"); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } - @SuppressWarnings("static-method") @Test public void foreman_runTryEnd() { + final long before = countAllocatedMemory(); + testForeman("run-try-end"); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } /** @@ -463,7 +476,7 @@ public class TestDrillbitResilience extends DrillTest { } result.release(); } - }; + } /** * Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down. @@ -537,7 +550,11 @@ public class TestDrillbitResilience extends DrillTest { * Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state. */ private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) { - assertCancelled(controls, TEST_QUERY, listener); + assertCancelledWithoutException(controls, listener, TEST_QUERY); + } + + private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener, final String query) { + assertCancelled(controls, query, listener); } /** @@ -579,6 +596,9 @@ public class TestDrillbitResilience extends DrillTest { @Test // To test pause and resume. Test hangs if resume did not happen. public void passThrough() { + final long before = countAllocatedMemory(); + + final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { @Override public void queryIdArrived(final QueryId queryId) { @@ -595,11 +615,16 @@ public class TestDrillbitResilience extends DrillTest { QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener); final Pair<QueryState, Exception> result = listener.waitForCompletion(); assertCompleteState(result, QueryState.COMPLETED); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } @Test // Cancellation TC 1: cancel before any result set is returned @Ignore // DRILL-3052 public void cancelBeforeAnyResultsArrive() { + final long before = countAllocatedMemory(); + final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { @Override @@ -611,10 +636,15 @@ public class TestDrillbitResilience extends DrillTest { final String controls = createPauseInjection(Foreman.class, "foreman-ready"); assertCancelledWithoutException(controls, listener); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } @Test // Cancellation TC 2: cancel in the middle of fetching result set public void cancelInMiddleOfFetchingResults() { + final long before = countAllocatedMemory(); + final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { private boolean cancelRequested = false; @@ -632,11 +662,16 @@ public class TestDrillbitResilience extends DrillTest { // skip once i.e. wait for one batch, so that #dataArrived above triggers #cancelAndResume final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1); assertCancelledWithoutException(controls, listener); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } @Test // Cancellation TC 3: cancel after all result set are produced but not all are fetched public void cancelAfterAllResultsProduced() { + final long before = countAllocatedMemory(); + final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { private int count = 0; @@ -652,10 +687,16 @@ public class TestDrillbitResilience extends DrillTest { final String controls = createPauseInjection(ScreenCreator.class, "send-complete"); assertCancelledWithoutException(controls, listener); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } @Test // Cancellation TC 4: cancel after everything is completed and fetched + @Ignore public void cancelAfterEverythingIsCompleted() { + final long before = countAllocatedMemory(); + final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { private int count = 0; @@ -671,16 +712,25 @@ public class TestDrillbitResilience extends DrillTest { final String controls = createPauseInjection(Foreman.class, "foreman-cleanup"); assertCancelledWithoutException(controls, listener); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } @Test // Completion TC 1: success public void successfullyCompletes() { + final long before = countAllocatedMemory(); + final WaitUntilCompleteListener listener = new WaitUntilCompleteListener(); QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener); final Pair<QueryState, Exception> result = listener.waitForCompletion(); assertCompleteState(result, QueryState.COMPLETED); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } + /** * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc. */ @@ -702,26 +752,41 @@ public class TestDrillbitResilience extends DrillTest { @Test // Completion TC 2: failed query - before query is executed - while sql parsing public void failsWhenParsing() { + final long before = countAllocatedMemory(); + final String exceptionDesc = "sql-parsing"; final Class<? extends Throwable> exceptionClass = ForemanSetupException.class; final String controls = createSingleException(DrillSqlWorker.class, exceptionDesc, exceptionClass); assertFailsWithException(controls, exceptionClass, exceptionDesc); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } @Test // Completion TC 3: failed query - before query is executed - while sending fragments to other drillbits public void failsWhenSendingFragments() { + final long before = countAllocatedMemory(); + final String exceptionDesc = "send-fragments"; final Class<? extends Throwable> exceptionClass = ForemanException.class; final String controls = createSingleException(Foreman.class, exceptionDesc, exceptionClass); assertFailsWithException(controls, exceptionClass, exceptionDesc); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } @Test // Completion TC 4: failed query - during query execution public void failsDuringExecution() { + final long before = countAllocatedMemory(); + final String exceptionDesc = "fragment-execution"; final Class<? extends Throwable> exceptionClass = IOException.class; final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass); assertFailsWithException(controls, exceptionClass, exceptionDesc); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } /** @@ -730,8 +795,13 @@ public class TestDrillbitResilience extends DrillTest { */ @Test public void testInterruptingBlockedMergingRecordBatch() { + final long before = countAllocatedMemory(); + final String control = createPauseInjection(MergingRecordBatch.class, "waiting-for-data", 1); testInterruptingBlockedFragmentsWaitingForData(control); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } /** @@ -740,8 +810,13 @@ public class TestDrillbitResilience extends DrillTest { */ @Test public void testInterruptingBlockedUnorderedReceiverBatch() { + final long before = countAllocatedMemory(); + final String control = createPauseInjection(UnorderedReceiverBatch.class, "waiting-for-data", 1); testInterruptingBlockedFragmentsWaitingForData(control); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } private static void testInterruptingBlockedFragmentsWaitingForData(final String control) { @@ -769,22 +844,27 @@ public class TestDrillbitResilience extends DrillTest { setSessionOption(HASHAGG.getOptionName(), "true"); setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(), "6"); + final long before = countAllocatedMemory(); + final String controls = "{\"injections\" : [" - + "{" - + "\"type\" : \"latch\"," - + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\"," - + "\"desc\" : \"partitioner-sender-latch\"" - + "}," - + "{" - + "\"type\" : \"pause\"," - + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\"," - + "\"desc\" : \"wait-for-fragment-interrupt\"," - + "\"nSkip\" : 1" - + "}" + - "]}"; + + "{" + + "\"type\" : \"latch\"," + + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\"," + + "\"desc\" : \"partitioner-sender-latch\"" + + "}," + + "{" + + "\"type\" : \"pause\"," + + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\"," + + "\"desc\" : \"wait-for-fragment-interrupt\"," + + "\"nSkip\" : 1" + + "}" + + "]}"; final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city"; assertCancelled(controls, query, new ListenerThatCancelsQueryAfterFirstBatchOfData()); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); } finally { setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT)); setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString()); @@ -795,9 +875,75 @@ public class TestDrillbitResilience extends DrillTest { @Test public void testInterruptingWhileFragmentIsBlockedInAcquiringSendingTicket() throws Exception { + + final long before = countAllocatedMemory(); + final String control = - createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1); + createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1); assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData()); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); + } + + @Test + public void memoryLeaksWhenCancelled() { + setSessionOption(SLICE_TARGET, "10"); + + final long before = countAllocatedMemory(); + + final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1); + String query = null; + try { + query = BaseTestQuery.getFile("queries/tpch/09.sql"); + } catch (final IOException e) { + fail("Failed to get query file: " + e); + } + + final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() { + private boolean cancelRequested = false; + + @Override + public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) { + if (!cancelRequested) { + check(queryId != null, "Query id should not be null, since we have waited long enough."); + cancelAndResume(); + cancelRequested = true; + } + result.release(); + } + }; + + assertCancelledWithoutException(controls, listener, query.substring(0, query.length() - 1)); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); + + setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT)); + } + + @Test + public void memoryLeaksWhenFailed() { + setSessionOption(SLICE_TARGET, "10"); + + final long before = countAllocatedMemory(); + + final String exceptionDesc = "fragment-execution"; + final Class<? extends Throwable> exceptionClass = IOException.class; + final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass); + String query = null; + try { + query = BaseTestQuery.getFile("queries/tpch/09.sql"); + } catch (final IOException e) { + fail("Failed to get query file: " + e); + } + + assertFailsWithException(controls, exceptionClass, exceptionDesc, query.substring(0, query.length() - 1)); + + final long after = countAllocatedMemory(); + assertEquals(String.format("We are leaking %d bytes", after - before), before, after); + + setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT)); } @Test // DRILL-3065
