This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f7013e012ce Add new test for handoff API (#16492)
f7013e012ce is described below

commit f7013e012cea2a818da055fc8e121dec5a07ae3a
Author: George Shiqi Wu <[email protected]>
AuthorDate: Tue May 28 12:57:51 2024 -0700

    Add new test for handoff API (#16492)
    
    * Add new test for handoff API
    
    * Add new method
    
    * fix test
    
    * Update test
---
 .../clients/OverlordResourceTestClient.java        |  30 ++++++
 .../druid/testing/clients/TaskResponseObject.java  |  11 +-
 .../tests/indexer/AbstractStreamIndexingTest.java  | 115 +++++++++++++++++++++
 .../ITKafkaIndexingServiceStopTasksEarlyTest.java  |  53 ++++++++++
 .../stream/data/supervisor_with_long_duration.json |  57 ++++++++++
 5 files changed, 265 insertions(+), 1 deletion(-)

diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 2f02c716a85..8167b9b64e1 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -160,6 +160,36 @@ public class OverlordResourceTestClient
     }
   }
 
+  public StatusResponseHolder handoffTaskGroupEarly(
+      String dataSource,
+      String taskGroups
+  )
+  {
+    try {
+      LOG.info("handing off %s %s", dataSource, taskGroups);
+      StatusResponseHolder response = httpClient.go(
+          new Request(HttpMethod.POST, new URL(StringUtils.format(
+              "%ssupervisor/%s/taskGroups/handoff",
+              getIndexerURL(),
+              StringUtils.urlEncode(dataSource)
+          ))).setContent(
+                  "application/json",
+                  StringUtils.toUtf8(taskGroups)
+              ),
+          StatusResponseHandler.getInstance()
+      ).get();
+      LOG.info("Handoff early response code " + 
response.getStatus().getCode());
+      LOG.info("Handoff early response " + response.getContent());
+      return response;
+    }
+    catch (ISE e) {
+      throw e;
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   public List<TaskResponseObject> getAllTasks()
   {
     return getTasks("tasks");
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
index 3062a6fa04b..c7907ae6fa5 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
@@ -32,6 +32,7 @@ public class TaskResponseObject
   private final DateTime createdTime;
   private final DateTime queueInsertionTime;
   private final TaskState status;
+  private final Long duration;
 
   @JsonCreator
   private TaskResponseObject(
@@ -39,7 +40,8 @@ public class TaskResponseObject
       @JsonProperty("type") String type,
       @JsonProperty("createdTime") DateTime createdTime,
       @JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
-      @JsonProperty("status") TaskState status
+      @JsonProperty("status") TaskState status,
+      @JsonProperty("duration") Long duration
   )
   {
     this.id = id;
@@ -47,6 +49,7 @@ public class TaskResponseObject
     this.createdTime = createdTime;
     this.queueInsertionTime = queueInsertionTime;
     this.status = status;
+    this.duration = duration;
   }
 
   @JsonProperty
@@ -78,4 +81,10 @@ public class TaskResponseObject
   {
     return status;
   }
+
+  @JsonProperty
+  public Long getDuration()
+  {
+    return duration;
+  }
 }
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 9ea1b5c402e..e20c2ea2061 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
 import org.apache.druid.testing.IntegrationTestingConfig;
@@ -75,6 +76,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
   private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
   private static final int STREAM_SHARD_COUNT = 2;
   protected static final long CYCLE_PADDING_MS = 100;
+  private static final int LONG_DURATION_SUPERVISOR_MILLIS = 600 * 1000;
 
   private static final String QUERIES_FILE = 
"/stream/queries/stream_index_queries.json";
   private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = 
"supervisor_spec_template.json";
@@ -82,6 +84,9 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
   private static final String 
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE =
       "supervisor_with_idle_behaviour_enabled_spec_template.json";
 
+  private static final String SUPERVISOR_LONG_DURATION_TEMPLATE_FILE =
+      "supervisor_with_long_duration.json";
+
   protected static final String DATA_RESOURCE_ROOT = "/stream/data";
   protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
       String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
@@ -90,6 +95,9 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
   protected static final String 
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH =
       String.join("/", DATA_RESOURCE_ROOT, 
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE);
 
+  protected static final String SUPERVISOR_WITH_LONG_DURATION_TEMPLATE_PATH =
+      String.join("/", DATA_RESOURCE_ROOT, 
SUPERVISOR_LONG_DURATION_TEMPLATE_FILE);
+
   protected static final String SERIALIZER_SPEC_DIR = "serializer";
   protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
   protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser";
@@ -230,6 +238,113 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
     }
   }
 
+  protected void doTestIndexDataHandoffEarly(
+      @Nullable Boolean transactionEnabled
+  ) throws Exception
+  {
+    final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
+        INPUT_FORMAT,
+        getResourceAsString(JSON_INPUT_FORMAT_PATH)
+    );
+    try (
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = 
createStreamEventWriter(config, transactionEnabled)
+    ) {
+      final String taskSpec = 
generatedTestConfig.getStreamIngestionPropsTransform()
+          
.apply(getResourceAsString(SUPERVISOR_WITH_LONG_DURATION_TEMPLATE_PATH));
+      LOG.info("supervisorSpec: [%s]\n", taskSpec);
+      // Start supervisor
+      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+      LOG.info("Submitted supervisor");
+
+      // Start generating half of the data
+      int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+      int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+      secondsToGenerateRemaining = secondsToGenerateRemaining - 
secondsToGenerateFirstRound;
+      final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(
+          new JsonEventSerializer(jsonMapper),
+          EVENTS_PER_SECOND,
+          CYCLE_PADDING_MS
+      );
+      long numWritten = streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateFirstRound,
+          FIRST_EVENT_TIME
+      );
+
+      // Make sure we consume the data written
+      long numWrittenHalf = numWritten;
+      ITRetryUtil.retryUntilTrue(
+          () ->
+              numWrittenHalf == this.queryHelper.countRows(
+                  generatedTestConfig.getFullDatasourceName(),
+                  Intervals.ETERNITY,
+                  name -> new LongSumAggregatorFactory(name, "count")
+              ),
+          StringUtils.format(
+              "dataSource[%s] consumed [%,d] events, expected [%,d]",
+              generatedTestConfig.getFullDatasourceName(),
+              this.queryHelper.countRows(
+                  generatedTestConfig.getFullDatasourceName(),
+                  Intervals.ETERNITY,
+                  name -> new LongSumAggregatorFactory(name, "count")
+              ),
+              numWritten
+          )
+      );
+
+      // Trigger early handoff
+      StatusResponseHolder response = indexer.handoffTaskGroupEarly(
+          generatedTestConfig.getFullDatasourceName(),
+          jsonMapper.writeValueAsString(
+              ImmutableMap.of(
+                  "taskGroupIds", ImmutableList.of(0)
+              )
+          )
+      );
+      Assert.assertEquals(response.getStatus().getCode(), 200);
+
+      // Load the rest of the data
+      numWritten += streamGenerator.run(
+          generatedTestConfig.getStreamName(),
+          streamEventWriter,
+          secondsToGenerateRemaining,
+          FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
+      );
+
+      // Make sure we consume the rest of the data
+      long numWrittenAll = numWritten;
+      ITRetryUtil.retryUntilTrue(
+          () ->
+              numWrittenAll == this.queryHelper.countRows(
+                  generatedTestConfig.getFullDatasourceName(),
+                  Intervals.ETERNITY,
+                  name -> new LongSumAggregatorFactory(name, "count")
+              ),
+          StringUtils.format(
+              "dataSource[%s] consumed [%,d] events, expected [%,d]",
+              generatedTestConfig.getFullDatasourceName(),
+              this.queryHelper.countRows(
+                  generatedTestConfig.getFullDatasourceName(),
+                  Intervals.ETERNITY,
+                  name -> new LongSumAggregatorFactory(name, "count")
+              ),
+              numWritten
+          )
+      );
+
+      // Wait for the early handoff task to complete and cheeck its duration
+      ITRetryUtil.retryUntilTrue(
+          () -> 
(!indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName()).isEmpty()),
+          "Waiting for Task Completion"
+      );
+
+      List<TaskResponseObject> completedTasks = 
indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName());
+      Assert.assertEquals(completedTasks.stream().filter(taskResponseObject -> 
taskResponseObject.getDuration() < LONG_DURATION_SUPERVISOR_MILLIS).count(), 1);
+    }
+  }
+
   void doTestIndexDataWithLosingCoordinator(@Nullable Boolean 
transactionEnabled) throws Exception
   {
     testIndexWithLosingNodeHelper(
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
new file mode 100644
index 00000000000..e729680159d
--- /dev/null
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = TestNGGroup.KAFKA_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceStopTasksEarlyTest extends 
AbstractKafkaIndexingServiceTest
+{
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "kafka_stop_tasks_early";
+  }
+
+  @BeforeClass
+  public void beforeClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+
+  // This test does not actually check whether the tasks stopped early since 
the API does
+  // not make any guarantees about handoff. Instead it makes sure the handoff 
API can be called
+  // and that the tasks will eventually catch up with new data.
+  @Test
+  public void testStopTasksEarly() throws Exception
+  {
+    doTestIndexDataHandoffEarly(false);
+  }
+}
diff --git 
a/integration-tests/src/test/resources/stream/data/supervisor_with_long_duration.json
 
b/integration-tests/src/test/resources/stream/data/supervisor_with_long_duration.json
new file mode 100644
index 00000000000..180b5a5e24c
--- /dev/null
+++ 
b/integration-tests/src/test/resources/stream/data/supervisor_with_long_duration.json
@@ -0,0 +1,57 @@
+{
+  "type": "%%STREAM_TYPE%%",
+  "dataSchema": {
+    "dataSource": "%%DATASOURCE%%",
+    "parser": %%PARSER%%,
+    "timestampSpec": {
+      "column": "timestamp",
+      "format": "auto"
+    },
+    "dimensionsSpec": {
+      "dimensions": %%DIMENSIONS%%,
+      "dimensionExclusions": [],
+      "spatialDimensions": []
+    },
+    "metricsSpec": [
+      {
+        "type": "count",
+        "name": "count"
+      },
+      {
+        "type": "doubleSum",
+        "name": "added",
+        "fieldName": "added"
+      },
+      {
+        "type": "doubleSum",
+        "name": "deleted",
+        "fieldName": "deleted"
+      },
+      {
+        "type": "doubleSum",
+        "name": "delta",
+        "fieldName": "delta"
+      }
+    ],
+    "granularitySpec": {
+      "type": "uniform",
+      "segmentGranularity": "MINUTE",
+      "queryGranularity": "NONE"
+    }
+  },
+  "tuningConfig": {
+    "type": "%%STREAM_TYPE%%",
+    "intermediatePersistPeriod": "PT30S",
+    "maxRowsPerSegment": 5000000,
+    "maxRowsInMemory": 500000
+  },
+  "ioConfig": {
+    "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
+    "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
+    "taskCount": 1,
+    "replicas": 1,
+    "taskDuration": "PT600S",
+    "%%USE_EARLIEST_KEY%%": true,
+    "inputFormat" : %%INPUT_FORMAT%%
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to