This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new adbebc174a Fix flaky tests in SeekableStreamSupervisorStateTest
(#12875)
adbebc174a is described below
commit adbebc174ab56bf9605658e80045dda3fe4ec527
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Tue Aug 16 18:38:03 2022 +0530
Fix flaky tests in SeekableStreamSupervisorStateTest (#12875)
* Fix flaky test in SeekableStreamSupervisorStateTest
* Fix for flaky security IT Test
* fix tests
* retry queries if there is some flakiness
---
.../java/org/apache/druid/utils/Throwables.java | 4 +-
.../common/task/CompactionTaskParallelRunTest.java | 5 +-
.../druid/indexing/common/task/IndexTaskTest.java | 63 ++++++++++++----------
.../SeekableStreamSupervisorStateTest.java | 43 +++++++++++----
.../clients/AbstractQueryResourceTestClient.java | 36 +++++++++++--
.../org/apache/druid/tests/security/ITTLSTest.java | 22 ++++++--
6 files changed, 123 insertions(+), 50 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/utils/Throwables.java
b/core/src/main/java/org/apache/druid/utils/Throwables.java
index b58025821f..3f9659ec82 100644
--- a/core/src/main/java/org/apache/druid/utils/Throwables.java
+++ b/core/src/main/java/org/apache/druid/utils/Throwables.java
@@ -31,10 +31,10 @@ public final class Throwables
* @return null if not found otherwise the cause exception that is of same
type as searchFor
*/
@Nullable
- public static Throwable getCauseOfType(Throwable t, Class<? extends
Throwable> searchFor)
+ public static <T extends Throwable> T getCauseOfType(Throwable t, Class<T>
searchFor)
{
if (searchFor.isAssignableFrom(t.getClass())) {
- return t;
+ return (T) t;
} else {
if (t.getCause() != null) {
return getCauseOfType(t.getCause(), searchFor);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index b1e911497b..71d6c88a10 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -33,6 +33,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -76,6 +77,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
+
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
@@ -807,7 +809,8 @@ public class CompactionTaskParallelRunTest extends
AbstractParallelIndexSupervis
private Set<DataSegment> runTask(Task task)
{
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity ==
LockGranularity.TIME_CHUNK);
- Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
+ TaskStatus status = getIndexingServiceClient().runAndWait(task);
+ Assert.assertEquals(status.toString(), TaskState.SUCCESS,
status.getStatusCode());
return getIndexingServiceClient().getPublishedSegments(task);
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index babc3ff1fd..125e8f1be8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -262,7 +262,7 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertFalse(indexTask.supportsQueries());
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
Assert.assertEquals(ImmutableList.of("ts", "dim", "valDim"),
segments.get(0).getDimensions());
Assert.assertEquals(ImmutableList.of("valMet"),
segments.get(0).getMetrics());
@@ -317,7 +317,7 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertFalse(indexTask.supportsQueries());
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
// only empty string dimensions are ignored currently
Assert.assertEquals(ImmutableList.of("ts", "valDim"),
segments.get(0).getDimensions());
@@ -354,7 +354,7 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertFalse(indexTask.supportsQueries());
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(2, segments.size());
@@ -458,7 +458,7 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals(indexTask.getId(), indexTask.getGroupId());
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
DataSegment segment = segments.get(0);
@@ -549,7 +549,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
}
@@ -585,7 +585,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
}
@@ -617,7 +617,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
@@ -660,7 +660,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
@@ -701,7 +701,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(2, segments.size());
@@ -776,7 +776,7 @@ public class IndexTaskTest extends IngestionTestBase
Assert.assertEquals("index_append_test", indexTask.getGroupId());
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(2,
taskRunner.getTaskActionClient().getActionCount(SegmentAllocateAction.class));
Assert.assertEquals(2, segments.size());
@@ -823,7 +823,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(3, segments.size());
@@ -929,7 +929,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
@@ -987,7 +987,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
@@ -1033,7 +1033,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(6, segments.size());
@@ -1077,7 +1077,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(3, segments.size());
@@ -1120,7 +1120,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(5, segments.size());
@@ -1447,7 +1447,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(Collections.singletonList("d"),
segments.get(0).getDimensions());
Assert.assertEquals(Collections.singletonList("val"),
segments.get(0).getMetrics());
@@ -2117,7 +2117,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
// the order of result segments can be changed because hash shardSpec is
used.
// the below loop is to make this test deterministic.
Assert.assertEquals(2, segments.size());
@@ -2255,7 +2255,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(5, segments.size());
@@ -2320,7 +2320,7 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- final List<DataSegment> segments = runTask(indexTask).rhs;
+ final List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(5, segments.size());
@@ -2395,7 +2395,7 @@ public class IndexTaskTest extends IngestionTestBase
);
// Ingest data with YEAR segment granularity
- List<DataSegment> segments = runTask(indexTask).rhs;
+ List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
Set<DataSegment> usedSegmentsBeforeOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
@@ -2424,7 +2424,7 @@ public class IndexTaskTest extends IngestionTestBase
);
// Ingest data with overwrite and MINUTE segment granularity
- segments = runTask(indexTask).rhs;
+ segments = runSuccessfulTask(indexTask);
Assert.assertEquals(3, segments.size());
Set<DataSegment> usedSegmentsBeforeAfterOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
@@ -2479,7 +2479,7 @@ public class IndexTaskTest extends IngestionTestBase
);
// Ingest data with DAY segment granularity
- List<DataSegment> segments = runTask(indexTask).rhs;
+ List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
Set<DataSegment> usedSegmentsBeforeOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
@@ -2508,7 +2508,7 @@ public class IndexTaskTest extends IngestionTestBase
);
// Ingest data with overwrite and HOUR segment granularity
- segments = runTask(indexTask).rhs;
+ segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
Set<DataSegment> usedSegmentsBeforeAfterOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
@@ -2571,7 +2571,7 @@ public class IndexTaskTest extends IngestionTestBase
);
// Ingest data with DAY segment granularity
- List<DataSegment> segments = runTask(indexTask).rhs;
+ List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
Set<DataSegment> usedSegmentsBeforeOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
@@ -2600,7 +2600,7 @@ public class IndexTaskTest extends IngestionTestBase
);
// Ingest data with overwrite and HOUR segment granularity
- segments = runTask(indexTask).rhs;
+ segments = runSuccessfulTask(indexTask);
Assert.assertEquals(24, segments.size());
Set<DataSegment> usedSegmentsBeforeAfterOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
@@ -2649,7 +2649,7 @@ public class IndexTaskTest extends IngestionTestBase
);
// Ingest data with DAY segment granularity
- List<DataSegment> segments = runTask(indexTask).rhs;
+ List<DataSegment> segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size());
Set<DataSegment> usedSegmentsBeforeOverwrite =
Sets.newHashSet(getSegmentsMetadataManager().iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(DATASOURCE,
Intervals.ETERNITY, true).get());
@@ -2688,7 +2688,7 @@ public class IndexTaskTest extends IngestionTestBase
);
// Ingest data with overwrite and same segment granularity
- segments = runTask(indexTask).rhs;
+ segments = runSuccessfulTask(indexTask);
Assert.assertEquals(1, segments.size()); // one tombstone
Assert.assertTrue(segments.get(0).isTombstone());
@@ -2731,6 +2731,13 @@ public class IndexTaskTest extends IngestionTestBase
);
}
+ private List<DataSegment> runSuccessfulTask(IndexTask task) throws Exception
+ {
+ Pair<TaskStatus, List<DataSegment>> pair = runTask(task);
+ Assert.assertEquals(pair.lhs.toString(), TaskState.SUCCESS,
pair.lhs.getStatusCode());
+ return pair.rhs;
+ }
+
private Pair<TaskStatus, List<DataSegment>> runTask(IndexTask task) throws
Exception
{
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity ==
LockGranularity.TIME_CHUNK);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index c67444a4de..82f3b58631 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -628,9 +628,10 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
{
expectEmitterSupervisor(false);
- CountDownLatch latch = new CountDownLatch(2);
+ CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new
TestEmittingTestSeekableStreamSupervisor(
latch,
+ TestEmittingTestSeekableStreamSupervisor.LAG,
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
);
@@ -671,9 +672,10 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
{
expectEmitterSupervisor(false);
- CountDownLatch latch = new CountDownLatch(2);
+ CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new
TestEmittingTestSeekableStreamSupervisor(
latch,
+ TestEmittingTestSeekableStreamSupervisor.LAG,
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
null
);
@@ -707,9 +709,10 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
{
expectEmitterSupervisor(false);
- CountDownLatch latch = new CountDownLatch(2);
+ CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new
TestEmittingTestSeekableStreamSupervisor(
latch,
+ TestEmittingTestSeekableStreamSupervisor.LAG,
null,
ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
);
@@ -746,6 +749,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new
TestEmittingTestSeekableStreamSupervisor(
latch,
+ TestEmittingTestSeekableStreamSupervisor.NOTICE_QUEUE,
null,
null
);
@@ -775,14 +779,14 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
public void testEmitNoticesTime() throws Exception
{
expectEmitterSupervisor(false);
- CountDownLatch latch = new CountDownLatch(2);
+ CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new
TestEmittingTestSeekableStreamSupervisor(
latch,
+ TestEmittingTestSeekableStreamSupervisor.NOTICE_PROCESS,
null,
null
);
supervisor.start();
- supervisor.emitNoticesTime();
Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.PENDING,
supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.PENDING,
supervisor.stateManager.getSupervisorState().getBasicState());
@@ -794,9 +798,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
events = filterMetrics(events, whitelist);
Assert.assertEquals(1, events.size());
Assert.assertEquals("ingest/notices/time",
events.get(0).toMap().get("metric"));
- Assert.assertEquals(500L, events.get(0).toMap().get("value"));
+ Assert.assertTrue(String.valueOf(events.get(0).toMap().get("value")),
(long) events.get(0).toMap().get("value") > 0);
Assert.assertEquals("testDS", events.get(0).toMap().get("dataSource"));
- Assert.assertEquals("dummyNoticeType",
events.get(0).toMap().get("noticeType"));
+ Assert.assertEquals("run_notice", events.get(0).toMap().get("noticeType"));
verifyAll();
}
@@ -805,9 +809,10 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
{
expectEmitterSupervisor(true);
- CountDownLatch latch = new CountDownLatch(2);
+ CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new
TestEmittingTestSeekableStreamSupervisor(
latch,
+ TestEmittingTestSeekableStreamSupervisor.LAG,
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
);
@@ -1297,14 +1302,22 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
private final CountDownLatch latch;
private final Map<String, Long> partitionsRecordLag;
private final Map<String, Long> partitionsTimeLag;
+ private final byte metricFlag;
+
+ private static final byte LAG = 0x01;
+ private static final byte NOTICE_QUEUE = 0x02;
+ private static final byte NOTICE_PROCESS = 0x04;
+
TestEmittingTestSeekableStreamSupervisor(
CountDownLatch latch,
+ byte metricFlag,
Map<String, Long> partitionsRecordLag,
Map<String, Long> partitionsTimeLag
)
{
this.latch = latch;
+ this.metricFlag = metricFlag;
this.partitionsRecordLag = partitionsRecordLag;
this.partitionsTimeLag = partitionsTimeLag;
}
@@ -1326,6 +1339,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
@Override
protected void emitLag()
{
+ if ((metricFlag & LAG) == 0) {
+ return;
+ }
super.emitLag();
if (stateManager.isSteadyState()) {
latch.countDown();
@@ -1335,13 +1351,20 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
@Override
protected void emitNoticesQueueSize()
{
+ if ((metricFlag & NOTICE_QUEUE) == 0) {
+ return;
+ }
super.emitNoticesQueueSize();
latch.countDown();
}
- public void emitNoticesTime()
+ @Override
+ public void emitNoticeProcessTime(String noticeType, long timeInMillis)
{
- super.emitNoticeProcessTime("dummyNoticeType", 500);
+ if ((metricFlag & NOTICE_PROCESS) == 0) {
+ return;
+ }
+ super.emitNoticeProcessTime(noticeType, timeInMillis);
latch.countDown();
}
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
index 907ac8d6b4..9b7911cc84 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
@@ -26,6 +26,7 @@ import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
@@ -33,12 +34,16 @@ import
org.apache.druid.java.util.http.client.response.BytesFullResponseHolder;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.testing.guice.TestClient;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.utils.Throwables;
+import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
+
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
@@ -47,9 +52,12 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
public abstract class AbstractQueryResourceTestClient<QueryType>
{
+ private static final Logger LOG = new
Logger(AbstractQueryResourceTestClient.class);
+
final String contentTypeHeader;
/**
@@ -145,10 +153,30 @@ public abstract class
AbstractQueryResourceTestClient<QueryType>
request.addHeader(HttpHeaders.Names.ACCEPT, this.acceptHeader);
}
- BytesFullResponseHolder response = httpClient.go(
- request,
- new BytesFullResponseHandler()
- ).get();
+ final AtomicReference<BytesFullResponseHolder> responseRef = new
AtomicReference<>();
+
+ ITRetryUtil.retryUntil(() -> {
+ try {
+ responseRef.set(httpClient.go(
+ request,
+ new BytesFullResponseHandler()
+ ).get());
+ }
+ catch (Throwable t) {
+ ChannelException ce = Throwables.getCauseOfType(t,
ChannelException.class);
+ if (ce != null) {
+ LOG.info(ce, "Encountered a channel exception. Retrying the query
request");
+ return false;
+ }
+ }
+ return true;
+ },
+ true,
+ 1000,
+ 3,
+ "waiting for queries to complete");
+
+ BytesFullResponseHolder response = responseRef.get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java
index d1c60b34f4..e8983b474c 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/security/ITTLSTest.java
@@ -56,6 +56,7 @@ import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.net.URL;
+import java.nio.channels.ClosedChannelException;
@Test(groups = TestNGGroup.SECURITY)
@Guice(moduleFactory = DruidTestModuleFactory.class)
@@ -446,11 +447,7 @@ public class ITTLSTest
catch (RuntimeException re) {
Throwable rootCause = Throwables.getRootCause(re);
- if (rootCause instanceof IOException
- && (null != rootCause.getMessage())
- && ("Broken pipe".equals(rootCause.getMessage())
- || "Connection reset by peer".contains(rootCause.getMessage()))
- ) {
+ if (isRetriable(rootCause)) {
if (retries > MAX_CONNECTION_EXCEPTION_RETRIES) {
Assert.fail(StringUtils.format(
"Broken pipe / connection reset retries exhausted, test
failed, did not get %s.",
@@ -537,4 +534,19 @@ public class ITTLSTest
throw new RuntimeException(e);
}
}
+
+ private boolean isRetriable(Throwable ex)
+ {
+ if (!(ex instanceof IOException)) {
+ return false;
+ }
+
+ if (ex instanceof ClosedChannelException) {
+ return true;
+ }
+
+ return null != ex.getMessage()
+ && ("Broken pipe".equals(ex.getMessage())
+ || "Connection reset by peer".contains(ex.getMessage()));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]