This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bd7924  Fix kinesis integration test (#10696)
5bd7924 is described below

commit 5bd792429664d8f5274ffea314f8bdeb296eb88a
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Mon Dec 21 12:57:40 2020 -0800

    Fix kinesis integration test (#10696)
    
    * fix kinesis IT
    
    * fix checkstyle
---
 .../clients/OverlordResourceTestClient.java        | 36 +++++++++++++++++++---
 .../apache/druid/testing/utils/ITRetryUtil.java    |  2 +-
 .../tests/indexer/AbstractStreamIndexingTest.java  | 34 ++++++++++++--------
 .../stream/data/supervisor_spec_template.json      |  2 +-
 4 files changed, 55 insertions(+), 19 deletions(-)

diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 4a5b53c..84604ec 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -306,14 +306,14 @@ public class OverlordResourceTestClient
     }
   }
 
-  public void shutdownSupervisor(String id)
+  public void terminateSupervisor(String id)
   {
     try {
       StatusResponseHolder response = httpClient.go(
           new Request(
               HttpMethod.POST,
               new URL(StringUtils.format(
-                  "%ssupervisor/%s/shutdown",
+                  "%ssupervisor/%s/terminate",
                   getIndexerURL(),
                   StringUtils.urlEncode(id)
               ))
@@ -322,12 +322,40 @@ public class OverlordResourceTestClient
       ).get();
       if (!response.getStatus().equals(HttpResponseStatus.OK)) {
         throw new ISE(
-            "Error while shutting down supervisor, response [%s %s]",
+            "Error while terminating supervisor, response [%s %s]",
             response.getStatus(),
             response.getContent()
         );
       }
-      LOG.info("Shutdown supervisor with id[%s]", id);
+      LOG.info("Terminate supervisor with id[%s]", id);
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void shutdownTask(String id)
+  {
+    try {
+      StatusResponseHolder response = httpClient.go(
+          new Request(
+              HttpMethod.POST,
+              new URL(StringUtils.format(
+                  "%stask/%s/shutdown",
+                      getIndexerURL(),
+                  StringUtils.urlEncode(id)
+              ))
+          ),
+          StatusResponseHandler.getInstance()
+      ).get();
+      if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+        throw new ISE(
+            "Error while shutdown task, response [%s %s]",
+            response.getStatus(),
+            response.getContent()
+        );
+      }
+      LOG.info("Shutdown task with id[%s]", id);
     }
     catch (Exception e) {
       throw new RuntimeException(e);
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java
index d107c87..3ef2f71 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/ITRetryUtil.java
@@ -30,7 +30,7 @@ public class ITRetryUtil
 
   private static final Logger LOG = new Logger(ITRetryUtil.class);
 
-  public static final int DEFAULT_RETRY_COUNT = 120; // 10 minutes
+  public static final int DEFAULT_RETRY_COUNT = 240; // 20 minutes
 
   public static final long DEFAULT_RETRY_SLEEP = TimeUnit.SECONDS.toMillis(5);
 
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 7285af1..5c2a18e 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.TaskResponseObject;
 import org.apache.druid.testing.utils.DruidClusterAdminClient;
 import org.apache.druid.testing.utils.EventSerializer;
 import org.apache.druid.testing.utils.ITRetryUtil;
@@ -488,8 +489,13 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
               name -> new LongSumAggregatorFactory(name, "count")
           ),
         StringUtils.format(
-            "dataSource[%s] consumed [%,d] events",
+            "dataSource[%s] consumed [%,d] events, expected [%,d]",
             generatedTestConfig.getFullDatasourceName(),
+            this.queryHelper.countRows(
+                generatedTestConfig.getFullDatasourceName(),
+                Intervals.ETERNITY,
+                name -> new LongSumAggregatorFactory(name, "count")
+            ),
             numWritten
         )
     );
@@ -499,22 +505,18 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
                                                 
.apply(getResourceAsString(QUERIES_FILE));
     // this query will probably be answered from the indexing tasks but 
possibly from 2 historical segments / 2 indexing
     this.queryHelper.testQueriesFromString(querySpec);
-    LOG.info("Shutting down supervisor");
-    indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
-    // Clear supervisor ID to not shutdown again.
-    generatedTestConfig.setSupervisorId(null);
-    // wait for all indexing tasks to finish
+
+    // All data written to stream within 10 secs.
+    // Each task duration is 30 secs. Hence, one task will be able to consume 
all data from the stream.
     LOG.info("Waiting for all indexing tasks to finish");
     ITRetryUtil.retryUntilTrue(
-        () -> 
(indexer.getUncompletedTasksForDataSource(generatedTestConfig.getFullDatasourceName()).size()
 == 0),
-        "Waiting for Tasks Completion"
+        () -> 
(indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName()).size()
 > 0),
+        "Waiting for Task Completion"
     );
+
     // wait for segments to be handed off
-    ITRetryUtil.retryUntil(
+    ITRetryUtil.retryUntilTrue(
         () -> 
coordinator.areSegmentsLoaded(generatedTestConfig.getFullDatasourceName()),
-        true,
-        10000,
-        30,
         "Real-time generated segments loaded"
     );
 
@@ -531,7 +533,13 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
   {
     if (generatedTestConfig.getSupervisorId() != null) {
       try {
-        indexer.shutdownSupervisor(generatedTestConfig.getSupervisorId());
+        LOG.info("Terminating supervisor");
+        indexer.terminateSupervisor(generatedTestConfig.getSupervisorId());
+        // Shutdown all tasks of supervisor
+        List<TaskResponseObject> runningTasks = 
indexer.getUncompletedTasksForDataSource(generatedTestConfig.getFullDatasourceName());
+        for (TaskResponseObject task : runningTasks) {
+          indexer.shutdownTask(task.getId());
+        }
       }
       catch (Exception e) {
         // Best effort cleanup as the supervisor may have already been cleanup
diff --git 
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
 
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
index 9943431..d921165 100644
--- 
a/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
+++ 
b/integration-tests/src/test/resources/stream/data/supervisor_spec_template.json
@@ -50,7 +50,7 @@
     "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
     "taskCount": 2,
     "replicas": 1,
-    "taskDuration": "PT5M",
+    "taskDuration": "PT30S",
     "%%USE_EARLIEST_KEY%%": true,
     "inputFormat" : %%INPUT_FORMAT%%
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to