This is an automated email from the ASF dual-hosted git repository.

luoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new e07aa78  DRILL-8030: Intermittent TestDrillbitResilience 
cancelInMiddleOfFetchingResults and foreman_runTryEnd failures
e07aa78 is described below

commit e07aa7816358fb525a8d896668bc00d991a47149
Author: Vitalii Diravka <[email protected]>
AuthorDate: Wed Nov 3 12:22:23 2021 +0200

    DRILL-8030: Intermittent TestDrillbitResilience 
cancelInMiddleOfFetchingResults and foreman_runTryEnd failures
---
 .../drill/exec/server/TestDrillbitResilience.java  | 106 ++++++++++++---------
 1 file changed, 60 insertions(+), 46 deletions(-)

diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index fed9c4d..47cc934 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -98,8 +98,13 @@ import org.slf4j.Logger;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 
 /**
- * Test how resilient drillbits are to throwing exceptions during various 
phases of query
- * execution by injecting exceptions at various points, and to cancellations 
in various phases.
+ * <p>Test how resilient drillbits are to throwing exceptions during various 
phases of query
+ * execution by injecting exceptions at various points, and to cancellations 
in various phases.</p>
+ *
+ * Note: to debug this test case:<br>
+ * <li>use 1000 value for PROBLEMATIC_TEST_NUM_RUNS and possibly NUM_RUNS for 
RepeatedTest</li>
+ * <li>specify Level.DEBUG for CURRENT_LOG_LEVEL</li>
+ * <li>compare trace output for successful test case and failed</li>
  */
 @Tag(SlowTest.TAG)
 @Tag(FlakyTest.TAG)
@@ -113,7 +118,7 @@ public class TestDrillbitResilience extends ClusterTest {
   private static final int NUM_RUNS = 3;
   private static final int PROBLEMATIC_TEST_NUM_RUNS = 3;
   private static final int TIMEOUT = 10;
-  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
+  private final static Level CURRENT_LOG_LEVEL = Level.DEBUG;
 
   /**
    * Note: Counting sys.memory executes a fragment on every drillbit. This is 
a better check in comparison to
@@ -281,7 +286,7 @@ public class TestDrillbitResilience extends ClusterTest {
     assertEquals(before, after, () -> String.format("We are leaking %d bytes", 
after - before));
   }
 
-  @Test // DRILL-3163, DRILL-3167
+  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-3163, 
DRILL-3167, DRILL-7973, DRILL-8030
   @Timeout(value = TIMEOUT)
   public void foreman_runTryEnd() {
     final long before = countAllocatedMemory();
@@ -320,10 +325,9 @@ public class TestDrillbitResilience extends ClusterTest {
     assertEquals(before, after, () -> String.format("We are leaking %d bytes", 
after - before));
   }
 
-  // DRILL-3052: Since root fragment is waiting on data and leaf fragments are 
cancelled before they send any
-  // data to root, root will never run. This test will timeout if the root did 
not send the final state to Foreman.
+
   // DRILL-2383: Cancellation TC 1: cancel before any result set is returned.
-  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // DRILL-3192
+  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-3052, 
DRILL-3192
   @Timeout(value = TIMEOUT)
   public void cancelWhenQueryIdArrives() {
     final long before = countAllocatedMemory();
@@ -333,7 +337,7 @@ public class TestDrillbitResilience extends ClusterTest {
       @Override
       public void queryIdArrived(final QueryId queryId) {
         super.queryIdArrived(queryId);
-        cancelAndResume(true);
+        cancelAndResume();
       }
     };
 
@@ -346,8 +350,9 @@ public class TestDrillbitResilience extends ClusterTest {
     assertEquals(before, after, () -> String.format("We are leaking %d bytes", 
after - before));
   }
 
-  @RepeatedTest(NUM_RUNS) // DRILL-6228
-  @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 2: cancel in the 
middle of fetching result set
+  // DRILL-2383: Cancellation TC 2: cancel in the middle of fetching result set
+  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-6228, 
DRILL-8030
+  @Timeout(value = TIMEOUT)
   public void cancelInMiddleOfFetchingResults() {
     final long before = countAllocatedMemory();
 
@@ -358,7 +363,7 @@ public class TestDrillbitResilience extends ClusterTest {
       public void dataArrived(final QueryDataBatch result, final 
ConnectionThrottle throttle) {
         if (!cancelRequested) {
           check(queryId != null, "Query id should not be null, since we have 
waited long enough.");
-          cancelAndResume(false);
+          cancelAndResume();
           cancelRequested = true;
         }
         result.release();
@@ -375,20 +380,19 @@ public class TestDrillbitResilience extends ClusterTest {
     assertEquals(before, after, () -> String.format("We are leaking %d bytes", 
after - before));
   }
 
-
-  @RepeatedTest(NUM_RUNS) // DRILL-6228
-  @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 3: cancel after all 
result set are produced but not all are fetched
+  // DRILL-2383: Cancellation TC 3: cancel after all result set are produced 
but not all are fetched
+  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-6228, 
DRILL-8030
+  @Timeout(value = TIMEOUT)
   public void cancelAfterAllResultsProduced() {
     final long before = countAllocatedMemory();
 
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() 
{
-      private int count = 0;
 
       @Override
       public void dataArrived(final QueryDataBatch result, final 
ConnectionThrottle throttle) {
         if (lastDrillbit()) {
           check(queryId != null, "Query id should not be null, since we have 
waited long enough.");
-          cancelAndResume(false);
+          cancelAndResume();
         }
         result.release();
       }
@@ -403,8 +407,9 @@ public class TestDrillbitResilience extends ClusterTest {
     assertEquals(before, after, () -> String.format("We are leaking %d bytes", 
after - before));
   }
 
-  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // DRILL-3967
-  @Timeout(value = TIMEOUT) // DRILL-2383: Cancellation TC 4: cancel after 
everything is completed and fetched
+  // DRILL-2383: Cancellation TC 4: cancel after everything is completed and 
fetched
+  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-3967, 
DRILL-7973
+  @Timeout(value = TIMEOUT)
   public void cancelAfterEverythingIsCompleted() {
     final long before = countAllocatedMemory();
 
@@ -416,7 +421,7 @@ public class TestDrillbitResilience extends ClusterTest {
           check(queryId != null, "Query id should not be null, since we have 
waited long enough.");
           // need to wait until all batches are processed, since 
foreman-cleanup - the pause that happened earlier,
           // than the client accepts queryCompleted() from UserResultsListener
-          cancelAndResume(true);
+          cancelAndResume();
         }
         result.release();
       }
@@ -457,7 +462,7 @@ public class TestDrillbitResilience extends ClusterTest {
     final String controls = Controls.newBuilder()
       .addException(DrillSqlWorker.class, exceptionDesc, exceptionClass, 0, 2)
       .build();
-    assertFailsWithException(controls, exceptionClass, exceptionDesc);
+    assertFailsWithException(controls, exceptionClass, exceptionDesc, 
TEST_QUERY);
 
     final long after = countAllocatedMemory();
     assertEquals(before, after, () -> String.format("We are leaking %d bytes", 
after - before));
@@ -473,7 +478,7 @@ public class TestDrillbitResilience extends ClusterTest {
     final String controls = Controls.newBuilder()
       .addException(FragmentsRunner.class, exceptionDesc, exceptionClass)
       .build();
-    assertFailsWithException(controls, exceptionClass, exceptionDesc);
+    assertFailsWithException(controls, exceptionClass, exceptionDesc, 
TEST_QUERY);
 
     final long after = countAllocatedMemory();
     assertEquals(before, after, () -> String.format("We are leaking %d bytes", 
after - before));
@@ -489,7 +494,7 @@ public class TestDrillbitResilience extends ClusterTest {
     final String controls = Controls.newBuilder()
       .addException(FragmentExecutor.class, exceptionDesc, exceptionClass)
       .build();
-    assertFailsWithException(controls, exceptionClass, exceptionDesc);
+    assertFailsWithException(controls, exceptionClass, exceptionDesc, 
TEST_QUERY);
 
     final long after = countAllocatedMemory();
     assertEquals(before, after, () -> String.format("We are leaking %d bytes", 
after - before));
@@ -564,7 +569,7 @@ public class TestDrillbitResilience extends ClusterTest {
     }
   }
 
-  @Test // DRILL-3193
+  @RepeatedTest(PROBLEMATIC_TEST_NUM_RUNS) // for more info: DRILL-3193, 
DRILL-7973
   @Timeout(value = TIMEOUT)
   public void interruptingWhileFragmentIsBlockedInAcquiringSendingTicket() {
     final long before = countAllocatedMemory();
@@ -604,7 +609,7 @@ public class TestDrillbitResilience extends ClusterTest {
         public void dataArrived(final QueryDataBatch result, final 
ConnectionThrottle throttle) {
           if (!cancelRequested) {
             check(queryId != null, "Query id should not be null, since we have 
waited long enough.");
-            cancelAndResume(false);
+            cancelAndResume();
             cancelRequested = true;
           }
           result.release();
@@ -620,7 +625,7 @@ public class TestDrillbitResilience extends ClusterTest {
     }
   }
 
-  @RepeatedTest(NUM_RUNS) // DRILL-3194
+  @RepeatedTest(NUM_RUNS) // for more info: DRILL-3194, DRILL-7973
   @Timeout(value = TIMEOUT)
   public void memoryLeaksWhenFailed() {
     client.alterSession(SLICE_TARGET, "10");
@@ -653,7 +658,7 @@ public class TestDrillbitResilience extends ClusterTest {
   }
 
   @Test
-  @Timeout(value = TIMEOUT) // DRILL-3065
+  @Timeout(value = TIMEOUT) // for more info: DRILL-3065
   public void failsAfterMSorterSorting() {
 
     // Note: must use an input table that returns more than one
@@ -675,7 +680,7 @@ public class TestDrillbitResilience extends ClusterTest {
   }
 
   @Test
-  @Timeout(value = TIMEOUT) // DRILL-3085
+  @Timeout(value = TIMEOUT) // for more info: DRILL-3085
   public void failsAfterMSorterSetup() {
 
     // Note: must use an input table that returns more than one
@@ -719,10 +724,10 @@ public class TestDrillbitResilience extends ClusterTest {
     /**
      * Method that cancels and resumes the query, in order.
      */
-    protected final void cancelAndResume(boolean sleepBeforeStart) {
+    protected final void cancelAndResume() {
       Preconditions.checkNotNull(queryId);
       final ExtendedLatch trigger = new ExtendedLatch(1);
-      (new CancellingThread(queryId, ex, trigger, sleepBeforeStart)).start();
+      (new CancellingThread(queryId, ex, trigger)).start();
       (new ResumingThread(queryId, ex, trigger)).start();
     }
 
@@ -779,7 +784,7 @@ public class TestDrillbitResilience extends ClusterTest {
       if (!cancelRequested) {
         logger.debug("First batch arrived, so cancelling thread started");
         check(queryId != null, "Query id should not be null, since we have 
waited long enough.");
-        (new CancellingThread(queryId, ex, null, false)).start();
+        (new CancellingThread(queryId, ex, null)).start();
         cancelRequested = true;
       }
       result.release();
@@ -793,25 +798,21 @@ public class TestDrillbitResilience extends ClusterTest {
     private final QueryId queryId;
     private final Pointer<Exception> ex;
     private final ExtendedLatch latch;
-    private final boolean sleepBeforeStart;
 
-    public CancellingThread(QueryId queryId, Pointer<Exception> ex, 
ExtendedLatch latch, boolean sleepBeforeStart) {
+    public CancellingThread(QueryId queryId, Pointer<Exception> ex, 
ExtendedLatch latch) {
       this.queryId = queryId;
       this.ex = ex;
       this.latch = latch;
-      this.sleepBeforeStart = sleepBeforeStart;
       logger.debug("Cancelling thread created");
     }
 
     @Override
     public void run() {
-      if(sleepBeforeStart) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          logger.debug("Sleep thread interrupted. Ignore it");
-          // just ignore
-        }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        logger.debug("Sleep thread interrupted. Ignore it");
+        // just ignore
       }
       logger.debug("Cancelling {} query started", queryId);
       final DrillRpcFuture<Ack> cancelAck = 
client.client().cancelQuery(queryId);
@@ -913,8 +914,8 @@ public class TestDrillbitResilience extends ClusterTest {
   private long countAllocatedMemory() {
     // wait to make sure all fragments finished cleaning up
     try {
-      logger.debug("Sleep thread for 0.05 seconds");
-      Thread.sleep(500);
+      logger.debug("Sleep thread for 2 seconds");
+      Thread.sleep(2000);
     } catch (InterruptedException e) {
       logger.debug("Sleep thread interrupted. Ignore it", e);
       // just ignore
@@ -955,9 +956,22 @@ public class TestDrillbitResilience extends ClusterTest {
     assertExceptionMessage(result.getSecond(), exceptionClass, exceptionDesc);
   }
 
-  private void assertFailsWithException(final String controls, final Class<? 
extends Throwable> exceptionClass,
-                                               final String exceptionDesc) {
-    assertFailsWithException(controls, exceptionClass, exceptionDesc, 
TEST_QUERY);
+  /**
+   * The same as {@link #assertFailsWithException}, but also allows COMPLETED 
state for the query.<br>
+   * Note: in some cases we are completing the query faster than run-try-end 
exception is injecetd and thrown in
+   * Foreman. The completed state is fine for such cases
+   */
+  private void assertFailsOrCompletedWithException(final String controls, 
final Class<? extends Throwable> exceptionClass,
+                                        final String exceptionDesc, final 
String query) {
+    setControls(controls);
+    final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
+    QueryTestUtil.testWithListener(client.client(), QueryType.SQL, query, 
listener);
+    final Pair<QueryState, Exception> result = listener.waitForCompletion();
+    final QueryState state = result.getFirst();
+
+    assertTrue(state.equals(QueryState.FAILED) || 
state.equals(QueryState.COMPLETED),
+      () -> String.format("Query state should be FAILED (and not %s).", 
state));
+    assertExceptionMessage(result.getSecond(), exceptionClass, exceptionDesc);
   }
 
   /**
@@ -970,7 +984,7 @@ public class TestDrillbitResilience extends ClusterTest {
     final String controls = Controls.newBuilder()
       .addException(Foreman.class, desc, ForemanException.class)
       .build();
-    assertFailsWithException(controls, ForemanException.class, desc);
+    assertFailsOrCompletedWithException(controls, ForemanException.class, 
desc, TEST_QUERY);
   }
 
   /**

Reply via email to