This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5bd7924 Fix kinesis integration test (#10696)
5bd7924 is described below
commit 5bd792429664d8f5274ffea314f8bdeb296eb88a
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Mon Dec 21 12:57:40 2020 -0800
Fix kinesis integration test (#10696)
* fix kinesis IT
* fix checkstyle
---
.../clients/OverlordResourceTestClient.java | 36 +++++++++++++++++++---
.../apache/druid/testing/utils/ITRetryUtil.java | 2 +-
.../tests/indexer/AbstractStreamIndexingTest.java | 34 ++++++++++++--------
.../stream/data/supervisor_spec_template.json | 2 +-
4 files changed, 55 insertions(+), 19 deletions(-)
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 4a5b53c..84604ec 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -306,14 +306,14 @@ public class OverlordResourceTestClient
}
}
- public void shutdownSupervisor(String id)
+ public void terminateSupervisor(String id)
{
try {
StatusResponseHolder response = httpClient.go(
new Request(
HttpMethod.POST,
new URL(StringUtils.format(
- "%ssupervisor/%s/shutdown",
+ "%ssupervisor/%s/terminate",
getIndexerURL(),
StringUtils.urlEncode(id)
))
@@ -322,12 +322,40 @@ public class OverlordResourceTestClient
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
- "Error while shutting down supervisor, response [%s %s]",
+ "Error while terminating supervisor, response [%s %s]",
response.getStatus(),
response.getContent()
);
}
- LOG.info("Shutdown supervisor with id[%s]", id);
+ LOG.info("Terminate supervisor with id[%s]", id);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void shutdownTask(String id)
+ {
+ try {
+ StatusResponseHolder response = httpClient.go(
+ new Request(
+ HttpMethod.POST,
+ new URL(StringUtils.format(
+ "%stask/%s/shutdown",
+ getIndexerURL(),
+ StringUtils.urlEncode(id)
+ ))
+ ),
+ StatusResponseHandler.getInstance()
+ ).get();
+ if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ throw new ISE(
+ "Error while shutdown task, response [%s %s]",
+ response.getStatus(),
+ response.getContent()
+ );
+ }
+ LOG.info("Shutdown task with id[%s]", id);
}
catch (Exception e) {
throw new RuntimeException(e);
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java
index d107c87..3ef2f71 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java
@@ -30,7 +30,7 @@ public class ITRetryUtil
private static final Logger LOG = new Logger(ITRetryUtil.class);
- public static final int DEFAULT_RETRY_COUNT = 120; // 10 minutes
+ public static final int DEFAULT_RETRY_COUNT = 240; // 20 minutes
public static final long DEFAULT_RETRY_SLEEP = TimeUnit.SECONDS.toMillis(5);
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 7285af1..5c2a18e 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.TaskResponseObject;
import org.apache.druid.testing.utils.DruidClusterAdminClient;
import org.apache.druid.testing.utils.EventSerializer;
import org.apache.druid.testing.utils.ITRetryUtil;
@@ -488,8 +489,13 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
name -> new LongSumAggregatorFactory(name, "count")
),
StringUtils.format(
- "dataSource[%s] consumed [%,d] events",
+ "dataSource[%s] consumed [%,d] events, expected [%,d]",
generatedTestConfig.getFullDatasourceName(),
+ this.queryHelper.countRows(
+ generatedTestConfig.getFullDatasourceName(),
+ Intervals.ETERNITY,
+ name -> new LongSumAggregatorFactory(name, "count")
+ ),
numWritten
)
);
@@ -499,22 +505,18 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
.apply(getResourceAsString(QUERIES_FILE));
// this query will probably be answered from the indexing tasks but
possibly from 2 historical segments / 2 indexing
this.queryHelper.testQueriesFromString(querySpec);
- LOG.info("Shutting down supervisor");
- indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
- // Clear supervisor ID to not shutdown again.
- generatedTestConfig.setSupervisorId(null);
- // wait for all indexing tasks to finish
+
+ // All data written to stream within 10 secs.
+ // Each task duration is 30 secs. Hence, one task will be able to consume
all data from the stream.
LOG.info("Waiting for all indexing tasks to finish");
ITRetryUtil.retryUntilTrue(
- () ->
(indexer.getUncompletedTasksForDataSource(generatedTestConfig.getFullDatasourceName()).size()
== 0),
- "Waiting for Tasks Completion"
+ () ->
(indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName()).size()
> 0),
+ "Waiting for Task Completion"
);
+
// wait for segments to be handed off
- ITRetryUtil.retryUntil(
+ ITRetryUtil.retryUntilTrue(
() ->
coordinator.areSegmentsLoaded(generatedTestConfig.getFullDatasourceName()),
- true,
- 10000,
- 30,
"Real-time generated segments loaded"
);
@@ -531,7 +533,13 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
{
if (generatedTestConfig.getSupervisorId() != null) {
try {
- indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
+ LOG.info("Terminating supervisor");
+ indexer.terminateSupervisor(generatedTestConfig.getSupervisorId());
+ // Shutdown all tasks of supervisor
+ List<TaskResponseObject> runningTasks =
indexer.getUncompletedTasksForDataSource(generatedTestConfig.getFullDatasourceName());
+ for (TaskResponseObject task : runningTasks) {
+ indexer.shutdownTask(task.getId());
+ }
}
catch (Exception e) {
// Best effort cleanup as the supervisor may have already been cleanup
diff --git
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
index 9943431..d921165 100644
---
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
+++
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
@@ -50,7 +50,7 @@
"%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
"taskCount": 2,
"replicas": 1,
- "taskDuration": "PT5M",
+ "taskDuration": "PT30S",
"%%USE_EARLIEST_KEY%%": true,
"inputFormat" : %%INPUT_FORMAT%%
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]