This is an automated email from the ASF dual-hosted git repository.
luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new e07aa78 DRILL-8030: Intermittent TestDrillbitResilience
cancelInMiddleOfFetchingResults and foreman_runTryEnd failures
e07aa78 is described below
commit e07aa7816358fb525a8d896668bc00d991a47149
Author: Vitalii Diravka <[email protected]>
AuthorDate: Wed Nov 3 12:22:23 2021 +0200
DRILL-8030: Intermittent TestDrillbitResilience
cancelInMiddleOfFetchingResults and foreman_runTryEnd failures
---
.../drill/exec/server/TestDrillbitResilience.java | 106 ++++++++++++---------
1 file changed, 60 insertions(+), 46 deletions(-)
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index fed9c4d..47cc934 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -98,8 +98,13 @@ import org.slf4j.Logger;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
/**
- * Test how resilient drillbits are to throwing exceptions during various
phases of query
- * execution by injecting exceptions at various points, and to cancellations
in various phases.
+ * <p>Test how resilient drillbits are to throwing exceptions during various
phases of query
+ * execution by injecting exceptions at various points, and to cancellations
in various phases.</p>
+ *
+ * Note: to debug this test case:<br>
+ * <li>use 1000 value for PROBLEMATIC_TEST_NUM_RUNS and possibly NUM_RUNS for
RepeatedTest</li>
+ * <li>specify Level.DEBUG for CURRENT_LOG_LEVEL</li>
+ * <li>compare trace output for successful test case and failed</li>
*/
@Tag(SlowTest.TAG)
@Tag(FlakyTest.TAG)
@@ -113,7 +118,7 @@ public class TestDrillbitResilience extends ClusterTest {
private static final int NUM_RUNS = 3;
private static final int PROBLEMATIC_TEST_NUM_RUNS = 3;
private static final int TIMEOUT = 10;
- private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+ private final static Level CURRENT_LOG_LEVEL = Level.DEBUG;
/**
* Note: Counting sys.memory executes a fragment on every drillbit. This is
a better check in comparison to
@@ -281,7 +286,7 @@ public class TestDrillbitResilience extends ClusterTest {
assertEquals(before, after, () -> String.format("We are leaking %d bytes",
after - before));
}
- @Test // DRILL-3163, DRILL-3167
+ @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-3163,
DRILL-3167, DRILL-7973, DRILL-8030
@Timeout(value = TIMEOUT)
public void foreman_runTryEnd() {
final long before = countAllocatedMemory();
@@ -320,10 +325,9 @@ public class TestDrillbitResilience extends ClusterTest {
assertEquals(before, after, () -> String.format("We are leaking %d bytes",
after - before));
}
- // DRILL-3052: Since root fragment is waiting on data and leaf fragments are
cancelled before they send any
- // data to root, root will never run. This test will timeout if the root did
not send the final state to Foreman.
+
// DRILL-2383: Cancellation TC 1: cancel before any result set is returned.
- @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // DRILL-3192
+ @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-3052,
DRILL-3192
@Timeout(value = TIMEOUT)
public void cancelWhenQueryIdArrives() {
final long before = countAllocatedMemory();
@@ -333,7 +337,7 @@ public class TestDrillbitResilience extends ClusterTest {
@Override
public void queryIdArrived(final QueryId queryId) {
super.queryIdArrived(queryId);
- cancelAndResume(true);
+ cancelAndResume();
}
};
@@ -346,8 +350,9 @@ public class TestDrillbitResilience extends ClusterTest {
assertEquals(before, after, () -> String.format("We are leaking %d bytes",
after - before));
}
- @RepeatedTest(NUM_RUNS) // DRILL-6228
- @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 2: cancel in the
middle of fetching result set
+ // DRILL-2383: Cancellation TC 2: cancel in the middle of fetching result set
+ @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-6228,
DRILL-8030
+ @Timeout(value = TIMEOUT)
public void cancelInMiddleOfFetchingResults() {
final long before = countAllocatedMemory();
@@ -358,7 +363,7 @@ public class TestDrillbitResilience extends ClusterTest {
public void dataArrived(final QueryDataBatch result, final
ConnectionThrottle throttle) {
if (!cancelRequested) {
check(queryId != null, "Query id should not be null, since we have
waited long enough.");
- cancelAndResume(false);
+ cancelAndResume();
cancelRequested = true;
}
result.release();
@@ -375,20 +380,19 @@ public class TestDrillbitResilience extends ClusterTest {
assertEquals(before, after, () -> String.format("We are leaking %d bytes",
after - before));
}
-
- @RepeatedTest(NUM_RUNS) // DRILL-6228
- @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 3: cancel after all
result set are produced but not all are fetched
+ // DRILL-2383: Cancellation TC 3: cancel after all result set are produced
but not all are fetched
+ @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-6228,
DRILL-8030
+ @Timeout(value = TIMEOUT)
public void cancelAfterAllResultsProduced() {
final long before = countAllocatedMemory();
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener()
{
- private int count = 0;
@Override
public void dataArrived(final QueryDataBatch result, final
ConnectionThrottle throttle) {
if (lastDrillbit()) {
check(queryId != null, "Query id should not be null, since we have
waited long enough.");
- cancelAndResume(false);
+ cancelAndResume();
}
result.release();
}
@@ -403,8 +407,9 @@ public class TestDrillbitResilience extends ClusterTest {
assertEquals(before, after, () -> String.format("We are leaking %d bytes",
after - before));
}
- @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // DRILL-3967
- @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 4: cancel after
everything is completed and fetched
+ // DRILL-2383: Cancellation TC 4: cancel after everything is completed and
fetched
+ @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-3967,
DRILL-7973
+ @Timeout(value = TIMEOUT)
public void cancelAfterEverythingIsCompleted() {
final long before = countAllocatedMemory();
@@ -416,7 +421,7 @@ public class TestDrillbitResilience extends ClusterTest {
check(queryId != null, "Query id should not be null, since we have
waited long enough.");
// need to wait until all batches are processed, since
foreman-cleanup - the pause that happened earlier,
// than the client accepts queryCompleted() from UserResultsListener
- cancelAndResume(true);
+ cancelAndResume();
}
result.release();
}
@@ -457,7 +462,7 @@ public class TestDrillbitResilience extends ClusterTest {
final String controls = Controls.newBuilder()
.addException(DrillSqlWorker.class, exceptionDesc, exceptionClass, 0, 2)
.build();
- assertFailsWithException(controls, exceptionClass, exceptionDesc);
+ assertFailsWithException(controls, exceptionClass, exceptionDesc,
TEST_QUERY);
final long after = countAllocatedMemory();
assertEquals(before, after, () -> String.format("We are leaking %d bytes",
after - before));
@@ -473,7 +478,7 @@ public class TestDrillbitResilience extends ClusterTest {
final String controls = Controls.newBuilder()
.addException(FragmentsRunner.class, exceptionDesc, exceptionClass)
.build();
- assertFailsWithException(controls, exceptionClass, exceptionDesc);
+ assertFailsWithException(controls, exceptionClass, exceptionDesc,
TEST_QUERY);
final long after = countAllocatedMemory();
assertEquals(before, after, () -> String.format("We are leaking %d bytes",
after - before));
@@ -489,7 +494,7 @@ public class TestDrillbitResilience extends ClusterTest {
final String controls = Controls.newBuilder()
.addException(FragmentExecutor.class, exceptionDesc, exceptionClass)
.build();
- assertFailsWithException(controls, exceptionClass, exceptionDesc);
+ assertFailsWithException(controls, exceptionClass, exceptionDesc,
TEST_QUERY);
final long after = countAllocatedMemory();
assertEquals(before, after, () -> String.format("We are leaking %d bytes",
after - before));
@@ -564,7 +569,7 @@ public class TestDrillbitResilience extends ClusterTest {
}
}
- @Test // DRILL-3193
+ @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-3193,
DRILL-7973
@Timeout(value = TIMEOUT)
public void interruptingWhileFragmentIsBlockedInAcquiringSendingTicket() {
final long before = countAllocatedMemory();
@@ -604,7 +609,7 @@ public class TestDrillbitResilience extends ClusterTest {
public void dataArrived(final QueryDataBatch result, final
ConnectionThrottle throttle) {
if (!cancelRequested) {
check(queryId != null, "Query id should not be null, since we have
waited long enough.");
- cancelAndResume(false);
+ cancelAndResume();
cancelRequested = true;
}
result.release();
@@ -620,7 +625,7 @@ public class TestDrillbitResilience extends ClusterTest {
}
}
- @RepeatedTest(NUM_RUNS) // DRILL-3194
+ @RepeatedTest(NUM_RUNS) // for more info: DRILL-3194, DRILL-7973
@Timeout(value = TIMEOUT)
public void memoryLeaksWhenFailed() {
client.alterSession(SLICE_TARGET, "10");
@@ -653,7 +658,7 @@ public class TestDrillbitResilience extends ClusterTest {
}
@Test
- @Timeout(value = TIMEOUT) // DRILL-3065
+ @Timeout(value = TIMEOUT) // for more info: DRILL-3065
public void failsAfterMSorterSorting() {
// Note: must use an input table that returns more than one
@@ -675,7 +680,7 @@ public class TestDrillbitResilience extends ClusterTest {
}
@Test
- @Timeout(value = TIMEOUT) // DRILL-3085
+ @Timeout(value = TIMEOUT) // for more info: DRILL-3085
public void failsAfterMSorterSetup() {
// Note: must use an input table that returns more than one
@@ -719,10 +724,10 @@ public class TestDrillbitResilience extends ClusterTest {
/**
* Method that cancels and resumes the query, in order.
*/
- protected final void cancelAndResume(boolean sleepBeforeStart) {
+ protected final void cancelAndResume() {
Preconditions.checkNotNull(queryId);
final ExtendedLatch trigger = new ExtendedLatch(1);
- (new CancellingThread(queryId, ex, trigger, sleepBeforeStart)).start();
+ (new CancellingThread(queryId, ex, trigger)).start();
(new ResumingThread(queryId, ex, trigger)).start();
}
@@ -779,7 +784,7 @@ public class TestDrillbitResilience extends ClusterTest {
if (!cancelRequested) {
logger.debug("First batch arrived, so cancelling thread started");
check(queryId != null, "Query id should not be null, since we have
waited long enough.");
- (new CancellingThread(queryId, ex, null, false)).start();
+ (new CancellingThread(queryId, ex, null)).start();
cancelRequested = true;
}
result.release();
@@ -793,25 +798,21 @@ public class TestDrillbitResilience extends ClusterTest {
private final QueryId queryId;
private final Pointer<Exception> ex;
private final ExtendedLatch latch;
- private final boolean sleepBeforeStart;
- public CancellingThread(QueryId queryId, Pointer<Exception> ex,
ExtendedLatch latch, boolean sleepBeforeStart) {
+ public CancellingThread(QueryId queryId, Pointer<Exception> ex,
ExtendedLatch latch) {
this.queryId = queryId;
this.ex = ex;
this.latch = latch;
- this.sleepBeforeStart = sleepBeforeStart;
logger.debug("Cancelling thread created");
}
@Override
public void run() {
- if(sleepBeforeStart) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.debug("Sleep thread interrupted. Ignore it");
- // just ignore
- }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ logger.debug("Sleep thread interrupted. Ignore it");
+ // just ignore
}
logger.debug("Cancelling {} query started", queryId);
final DrillRpcFuture<Ack> cancelAck =
client.client().cancelQuery(queryId);
@@ -913,8 +914,8 @@ public class TestDrillbitResilience extends ClusterTest {
private long countAllocatedMemory() {
// wait to make sure all fragments finished cleaning up
try {
- logger.debug("Sleep thread for 0.05 seconds");
- Thread.sleep(500);
+ logger.debug("Sleep thread for 2 seconds");
+ Thread.sleep(2000);
} catch (InterruptedException e) {
logger.debug("Sleep thread interrupted. Ignore it", e);
// just ignore
@@ -955,9 +956,22 @@ public class TestDrillbitResilience extends ClusterTest {
assertExceptionMessage(result.getSecond(), exceptionClass, exceptionDesc);
}
- private void assertFailsWithException(final String controls, final Class<?
extends Throwable> exceptionClass,
- final String exceptionDesc) {
- assertFailsWithException(controls, exceptionClass, exceptionDesc,
TEST_QUERY);
+ /**
+ * The same as {@link #assertFailsWithException}, but also allows COMPLETED
state for the query.<br>
+ * Note: in some cases we are completing the query faster than run-try-end
exception is injecetd and thrown in
+ * Foreman. The completed state is fine for such cases
+ */
+ private void assertFailsOrCompletedWithException(final String controls,
final Class<? extends Throwable> exceptionClass,
+ final String exceptionDesc, final
String query) {
+ setControls(controls);
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
+ QueryTestUtil.testWithListener(client.client(), QueryType.SQL, query,
listener);
+ final Pair<QueryState, Exception> result = listener.waitForCompletion();
+ final QueryState state = result.getFirst();
+
+ assertTrue(state.equals(QueryState.FAILED) ||
state.equals(QueryState.COMPLETED),
+ () -> String.format("Query state should be FAILED (and not %s).",
state));
+ assertExceptionMessage(result.getSecond(), exceptionClass, exceptionDesc);
}
/**
@@ -970,7 +984,7 @@ public class TestDrillbitResilience extends ClusterTest {
final String controls = Controls.newBuilder()
.addException(Foreman.class, desc, ForemanException.class)
.build();
- assertFailsWithException(controls, ForemanException.class, desc);
+ assertFailsOrCompletedWithException(controls, ForemanException.class,
desc, TEST_QUERY);
}
/**