This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new adbebc174a Fix flaky tests in SeekableStreamSupervisorStateTest 
(#12875)
adbebc174a is described below

commit adbebc174ab56bf9605658e80045dda3fe4ec527
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Tue Aug 16 18:38:03 2022 +0530

    Fix flaky tests in SeekableStreamSupervisorStateTest (#12875)
    
    * Fix flaky test in SeekableStreamSupervisorStateTest
    
    * Fix for flaky security IT Test
    
    * fix tests
    
    * retry queries if there is some flakiness
---
 .../java/org/apache/druid/utils/Throwables.java    |  4 +-
 .../common/task/CompactionTaskParallelRunTest.java |  5 +-
 .../druid/indexing/common/task/IndexTaskTest.java  | 63 ++++++++++++----------
 .../SeekableStreamSupervisorStateTest.java         | 43 +++++++++++----
 .../clients/AbstractQueryResourceTestClient.java   | 36 +++++++++++--
 .../org/apache/druid/tests/security/ITTLSTest.java | 22 ++++++--
 6 files changed, 123 insertions(+), 50 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/utils/Throwables.java 
b/core/src/main/java/org/apache/druid/utils/Throwables.java
index b58025821f..3f9659ec82 100644
--- a/core/src/main/java/org/apache/druid/utils/Throwables.java
+++ b/core/src/main/java/org/apache/druid/utils/Throwables.java
@@ -31,10 +31,10 @@ public final class Throwables
    * @return null if not found otherwise the cause exception that is of same 
type as searchFor
    */
   @Nullable
-  public static Throwable getCauseOfType(Throwable t, Class<? extends 
Throwable> searchFor)
+  public static <T extends Throwable> T getCauseOfType(Throwable t, Class<T> 
searchFor)
   {
     if (searchFor.isAssignableFrom(t.getClass())) {
-      return t;
+      return (T) t;
     } else {
       if (t.getCause() != null) {
         return getCauseOfType(t.getCause(), searchFor);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index b1e911497b..71d6c88a10 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -33,6 +33,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.LocalInputSource;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -76,6 +77,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import javax.annotation.Nullable;
+
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
@@ -807,7 +809,8 @@ public class CompactionTaskParallelRunTest extends 
AbstractParallelIndexSupervis
   private Set<DataSegment> runTask(Task task)
   {
     task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == 
LockGranularity.TIME_CHUNK);
-    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
+    TaskStatus status = getIndexingServiceClient().runAndWait(task);
+    Assert.assertEquals(status.toString(), TaskState.SUCCESS, 
status.getStatusCode());
     return getIndexingServiceClient().getPublishedSegments(task);
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index babc3ff1fd..125e8f1be8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -262,7 +262,7 @@ public class IndexTaskTest extends IngestionTestBase
 
     Assert.assertFalse(indexTask.supportsQueries());
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
     Assert.assertEquals(1, segments.size());
     Assert.assertEquals(ImmutableList.of("ts", "dim", "valDim"), 
segments.get(0).getDimensions());
     Assert.assertEquals(ImmutableList.of("valMet"), 
segments.get(0).getMetrics());
@@ -317,7 +317,7 @@ public class IndexTaskTest extends IngestionTestBase
 
     Assert.assertFalse(indexTask.supportsQueries());
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
     Assert.assertEquals(1, segments.size());
     // only empty string dimensions are ignored currently
     Assert.assertEquals(ImmutableList.of("ts", "valDim"), 
segments.get(0).getDimensions());
@@ -354,7 +354,7 @@ public class IndexTaskTest extends IngestionTestBase
 
     Assert.assertFalse(indexTask.supportsQueries());
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(2, segments.size());
 
@@ -458,7 +458,7 @@ public class IndexTaskTest extends IngestionTestBase
 
     Assert.assertEquals(indexTask.getId(), indexTask.getGroupId());
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
     DataSegment segment = segments.get(0);
@@ -549,7 +549,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
   }
@@ -585,7 +585,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
   }
@@ -617,7 +617,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
 
@@ -660,7 +660,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
 
@@ -701,7 +701,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(2, segments.size());
 
@@ -776,7 +776,7 @@ public class IndexTaskTest extends IngestionTestBase
 
     Assert.assertEquals("index_append_test", indexTask.getGroupId());
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(2, 
taskRunner.getTaskActionClient().getActionCount(SegmentAllocateAction.class));
     Assert.assertEquals(2, segments.size());
@@ -823,7 +823,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(3, segments.size());
 
@@ -929,7 +929,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
 
@@ -987,7 +987,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
 
@@ -1033,7 +1033,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(6, segments.size());
 
@@ -1077,7 +1077,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(3, segments.size());
 
@@ -1120,7 +1120,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(5, segments.size());
 
@@ -1447,7 +1447,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(Collections.singletonList("d"), 
segments.get(0).getDimensions());
     Assert.assertEquals(Collections.singletonList("val"), 
segments.get(0).getMetrics());
@@ -2117,7 +2117,7 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    final List<DataSegment> segments = runTask(indexTask).rhs;
+    final List<DataSegment> segments = runSuccessfulTask(indexTask);
     // the order of result segments can be changed because hash shardSpec is 
used.
     // the below loop is to make this test deterministic.
     Assert.assertEquals(2, segments.size());
@@ -2255,7 +2255,7 @@ public class IndexTaskTest extends IngestionTestBase
           null
       );
 
-      final List<DataSegment> segments = runTask(indexTask).rhs;
+      final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
       Assert.assertEquals(5, segments.size());
 
@@ -2320,7 +2320,7 @@ public class IndexTaskTest extends IngestionTestBase
           null
       );
 
-      final List<DataSegment> segments = runTask(indexTask).rhs;
+      final List<DataSegment> segments = runSuccessfulTask(indexTask);
 
       Assert.assertEquals(5, segments.size());
 
@@ -2395,7 +2395,7 @@ public class IndexTaskTest extends IngestionTestBase
     );
 
     // Ingest data with YEAR segment granularity
-    List<DataSegment> segments = runTask(indexTask).rhs;
+    List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
     Set<DataSegment> usedSegmentsBeforeOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
@@ -2424,7 +2424,7 @@ public class IndexTaskTest extends IngestionTestBase
     );
 
     // Ingest data with overwrite and MINUTE segment granularity
-    segments = runTask(indexTask).rhs;
+    segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(3, segments.size());
     Set<DataSegment> usedSegmentsBeforeAfterOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
@@ -2479,7 +2479,7 @@ public class IndexTaskTest extends IngestionTestBase
     );
 
     // Ingest data with DAY segment granularity
-    List<DataSegment> segments = runTask(indexTask).rhs;
+    List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
     Set<DataSegment> usedSegmentsBeforeOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
@@ -2508,7 +2508,7 @@ public class IndexTaskTest extends IngestionTestBase
     );
 
     // Ingest data with overwrite and HOUR segment granularity
-    segments = runTask(indexTask).rhs;
+    segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
     Set<DataSegment> usedSegmentsBeforeAfterOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
@@ -2571,7 +2571,7 @@ public class IndexTaskTest extends IngestionTestBase
     );
 
     // Ingest data with DAY segment granularity
-    List<DataSegment> segments = runTask(indexTask).rhs;
+    List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
     Set<DataSegment> usedSegmentsBeforeOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
@@ -2600,7 +2600,7 @@ public class IndexTaskTest extends IngestionTestBase
     );
 
     // Ingest data with overwrite and HOUR segment granularity
-    segments = runTask(indexTask).rhs;
+    segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(24, segments.size());
     Set<DataSegment> usedSegmentsBeforeAfterOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
@@ -2649,7 +2649,7 @@ public class IndexTaskTest extends IngestionTestBase
     );
 
     // Ingest data with DAY segment granularity
-    List<DataSegment> segments = runTask(indexTask).rhs;
+    List<DataSegment> segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size());
     Set<DataSegment> usedSegmentsBeforeOverwrite = 
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
 Intervals.ETERNITY, true).get());
@@ -2688,7 +2688,7 @@ public class IndexTaskTest extends IngestionTestBase
     );
 
     // Ingest data with overwrite and same segment granularity
-    segments = runTask(indexTask).rhs;
+    segments = runSuccessfulTask(indexTask);
 
     Assert.assertEquals(1, segments.size()); // one tombstone
     Assert.assertTrue(segments.get(0).isTombstone());
@@ -2731,6 +2731,13 @@ public class IndexTaskTest extends IngestionTestBase
     );
   }
 
+  private List<DataSegment> runSuccessfulTask(IndexTask task) throws Exception
+  {
+    Pair<TaskStatus, List<DataSegment>> pair = runTask(task);
+    Assert.assertEquals(pair.lhs.toString(), TaskState.SUCCESS, 
pair.lhs.getStatusCode());
+    return pair.rhs;
+  }
+
   private Pair<TaskStatus, List<DataSegment>> runTask(IndexTask task) throws 
Exception
   {
     task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == 
LockGranularity.TIME_CHUNK);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index c67444a4de..82f3b58631 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -628,9 +628,10 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   {
     expectEmitterSupervisor(false);
 
-    CountDownLatch latch = new CountDownLatch(2);
+    CountDownLatch latch = new CountDownLatch(1);
     TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
         latch,
+        TestEmittingTestSeekableStreamSupervisor.LAG,
         ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
         ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
     );
@@ -671,9 +672,10 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   {
     expectEmitterSupervisor(false);
 
-    CountDownLatch latch = new CountDownLatch(2);
+    CountDownLatch latch = new CountDownLatch(1);
     TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
         latch,
+        TestEmittingTestSeekableStreamSupervisor.LAG,
         ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
         null
     );
@@ -707,9 +709,10 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   {
     expectEmitterSupervisor(false);
 
-    CountDownLatch latch = new CountDownLatch(2);
+    CountDownLatch latch = new CountDownLatch(1);
     TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
         latch,
+        TestEmittingTestSeekableStreamSupervisor.LAG,
         null,
         ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
     );
@@ -746,6 +749,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     CountDownLatch latch = new CountDownLatch(1);
     TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
         latch,
+        TestEmittingTestSeekableStreamSupervisor.NOTICE_QUEUE,
         null,
         null
     );
@@ -775,14 +779,14 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   public void testEmitNoticesTime() throws Exception
   {
     expectEmitterSupervisor(false);
-    CountDownLatch latch = new CountDownLatch(2);
+    CountDownLatch latch = new CountDownLatch(1);
     TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
         latch,
+        TestEmittingTestSeekableStreamSupervisor.NOTICE_PROCESS,
         null,
         null
     );
     supervisor.start();
-    supervisor.emitNoticesTime();
     Assert.assertTrue(supervisor.stateManager.isHealthy());
     Assert.assertEquals(BasicState.PENDING, 
supervisor.stateManager.getSupervisorState());
     Assert.assertEquals(BasicState.PENDING, 
supervisor.stateManager.getSupervisorState().getBasicState());
@@ -794,9 +798,9 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     events = filterMetrics(events, whitelist);
     Assert.assertEquals(1, events.size());
     Assert.assertEquals("ingest/notices/time", 
events.get(0).toMap().get("metric"));
-    Assert.assertEquals(500L, events.get(0).toMap().get("value"));
+    Assert.assertTrue(String.valueOf(events.get(0).toMap().get("value")), 
(long) events.get(0).toMap().get("value") > 0);
     Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
-    Assert.assertEquals("dummyNoticeType", 
events.get(0).toMap().get("noticeType"));
+    Assert.assertEquals("run_notice", events.get(0).toMap().get("noticeType"));
     verifyAll();
   }
 
@@ -805,9 +809,10 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   {
     expectEmitterSupervisor(true);
 
-    CountDownLatch latch = new CountDownLatch(2);
+    CountDownLatch latch = new CountDownLatch(1);
     TestEmittingTestSeekableStreamSupervisor supervisor = new 
TestEmittingTestSeekableStreamSupervisor(
         latch,
+        TestEmittingTestSeekableStreamSupervisor.LAG,
         ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
         ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
     );
@@ -1297,14 +1302,22 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     private final CountDownLatch latch;
     private final Map<String, Long> partitionsRecordLag;
     private final Map<String, Long> partitionsTimeLag;
+    private final byte metricFlag;
+
+    private static final byte LAG = 0x01;
+    private static final byte NOTICE_QUEUE = 0x02;
+    private static final byte NOTICE_PROCESS = 0x04;
+
 
     TestEmittingTestSeekableStreamSupervisor(
         CountDownLatch latch,
+        byte metricFlag,
         Map<String, Long> partitionsRecordLag,
         Map<String, Long> partitionsTimeLag
     )
     {
       this.latch = latch;
+      this.metricFlag = metricFlag;
       this.partitionsRecordLag = partitionsRecordLag;
       this.partitionsTimeLag = partitionsTimeLag;
     }
@@ -1326,6 +1339,9 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     @Override
     protected void emitLag()
     {
+      if ((metricFlag & LAG) == 0) {
+        return;
+      }
       super.emitLag();
       if (stateManager.isSteadyState()) {
         latch.countDown();
@@ -1335,13 +1351,20 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     @Override
     protected void emitNoticesQueueSize()
     {
+      if ((metricFlag & NOTICE_QUEUE) == 0) {
+        return;
+      }
       super.emitNoticesQueueSize();
       latch.countDown();
     }
 
-    public void emitNoticesTime()
+    @Override
+    public void emitNoticeProcessTime(String noticeType, long timeInMillis)
     {
-      super.emitNoticeProcessTime("dummyNoticeType", 500);
+      if ((metricFlag & NOTICE_PROCESS) == 0) {
+        return;
+      }
+      super.emitNoticeProcessTime(noticeType, timeInMillis);
       latch.countDown();
     }
 
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
index 907ac8d6b4..9b7911cc84 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
@@ -26,6 +26,7 @@ import com.google.inject.Inject;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.Request;
 import 
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
@@ -33,12 +34,16 @@ import 
org.apache.druid.java.util.http.client.response.BytesFullResponseHolder;
 import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
 import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
 import org.apache.druid.testing.guice.TestClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.utils.Throwables;
+import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 
 import javax.annotation.Nullable;
 import javax.ws.rs.core.MediaType;
+
 import java.io.IOException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
@@ -47,9 +52,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 public abstract class AbstractQueryResourceTestClient<QueryType>
 {
+  private static final Logger LOG = new 
Logger(AbstractQueryResourceTestClient.class);
+
   final String contentTypeHeader;
 
   /**
@@ -145,10 +153,30 @@ public abstract class 
AbstractQueryResourceTestClient<QueryType>
         request.addHeader(HttpHeaders.Names.ACCEPT, this.acceptHeader);
       }
 
-      BytesFullResponseHolder response = httpClient.go(
-          request,
-          new BytesFullResponseHandler()
-      ).get();
+      final AtomicReference<BytesFullResponseHolder> responseRef = new 
AtomicReference<>();
+
+      ITRetryUtil.retryUntil(() -> {
+        try {
+          responseRef.set(httpClient.go(
+              request,
+              new BytesFullResponseHandler()
+          ).get());
+        }
+        catch (Throwable t) {
+          ChannelException ce = Throwables.getCauseOfType(t, 
ChannelException.class);
+          if (ce != null) {
+            LOG.info(ce, "Encountered a channel exception. Retrying the query 
request");
+            return false;
+          }
+        }
+        return true;
+      },
+          true,
+          1000,
+          3,
+          "waiting for queries to complete");
+
+      BytesFullResponseHolder response = responseRef.get();
 
       if (!response.getStatus().equals(HttpResponseStatus.OK)) {
         throw new ISE(
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java
index d1c60b34f4..e8983b474c 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java
@@ -56,6 +56,7 @@ import javax.ws.rs.core.MediaType;
 
 import java.io.IOException;
 import java.net.URL;
+import java.nio.channels.ClosedChannelException;
 
 @Test(groups = TestNGGroup.SECURITY)
 @Guice(moduleFactory = DruidTestModuleFactory.class)
@@ -446,11 +447,7 @@ public class ITTLSTest
       catch (RuntimeException re) {
         Throwable rootCause = Throwables.getRootCause(re);
 
-        if (rootCause instanceof IOException
-            && (null != rootCause.getMessage())
-            && ("Broken pipe".equals(rootCause.getMessage())
-            || "Connection reset by peer".contains(rootCause.getMessage()))
-        ) {
+        if (isRetriable(rootCause)) {
           if (retries > MAX_CONNECTION_EXCEPTION_RETRIES) {
             Assert.fail(StringUtils.format(
                 "Broken pipe / connection reset retries exhausted, test 
failed, did not get %s.",
@@ -537,4 +534,19 @@ public class ITTLSTest
       throw new RuntimeException(e);
     }
   }
+
+  private boolean isRetriable(Throwable ex)
+  {
+    if (!(ex instanceof IOException)) {
+      return false;
+    }
+
+    if (ex instanceof ClosedChannelException) {
+      return true;
+    }
+
+    return null != ex.getMessage()
+        && ("Broken pipe".equals(ex.getMessage())
+            || "Connection reset by peer".contains(ex.getMessage()));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to