This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f7013e012ce Add new test for handoff API (#16492)
f7013e012ce is described below
commit f7013e012cea2a818da055fc8e121dec5a07ae3a
Author: George Shiqi Wu <[email protected]>
AuthorDate: Tue May 28 12:57:51 2024 -0700
Add new test for handoff API (#16492)
* Add new test for handoff API
* Add new method
* fix test
* Update test
---
.../clients/OverlordResourceTestClient.java | 30 ++++++
.../druid/testing/clients/TaskResponseObject.java | 11 +-
.../tests/indexer/AbstractStreamIndexingTest.java | 115 +++++++++++++++++++++
.../ITKafkaIndexingServiceStopTasksEarlyTest.java | 53 ++++++++++
.../stream/data/supervisor_with_long_duration.json | 57 ++++++++++
5 files changed, 265 insertions(+), 1 deletion(-)
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 2f02c716a85..8167b9b64e1 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -160,6 +160,36 @@ public class OverlordResourceTestClient
}
}
+ public StatusResponseHolder handoffTaskGroupEarly(
+ String dataSource,
+ String taskGroups
+ )
+ {
+ try {
+ LOG.info("handing off %s %s", dataSource, taskGroups);
+ StatusResponseHolder response = httpClient.go(
+ new Request(HttpMethod.POST, new URL(StringUtils.format(
+ "%ssupervisor/%s/taskGroups/handoff",
+ getIndexerURL(),
+ StringUtils.urlEncode(dataSource)
+ ))).setContent(
+ "application/json",
+ StringUtils.toUtf8(taskGroups)
+ ),
+ StatusResponseHandler.getInstance()
+ ).get();
+ LOG.info("Handoff early response code " +
response.getStatus().getCode());
+ LOG.info("Handoff early response " + response.getContent());
+ return response;
+ }
+ catch (ISE e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public List<TaskResponseObject> getAllTasks()
{
return getTasks("tasks");
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
index 3062a6fa04b..c7907ae6fa5 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/clients/TaskResponseObject.java
@@ -32,6 +32,7 @@ public class TaskResponseObject
private final DateTime createdTime;
private final DateTime queueInsertionTime;
private final TaskState status;
+ private final Long duration;
@JsonCreator
private TaskResponseObject(
@@ -39,7 +40,8 @@ public class TaskResponseObject
@JsonProperty("type") String type,
@JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
- @JsonProperty("status") TaskState status
+ @JsonProperty("status") TaskState status,
+ @JsonProperty("duration") Long duration
)
{
this.id = id;
@@ -47,6 +49,7 @@ public class TaskResponseObject
this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime;
this.status = status;
+ this.duration = duration;
}
@JsonProperty
@@ -78,4 +81,10 @@ public class TaskResponseObject
{
return status;
}
+
+ @JsonProperty
+ public Long getDuration()
+ {
+ return duration;
+ }
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 9ea1b5c402e..e20c2ea2061 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.testing.IntegrationTestingConfig;
@@ -75,6 +76,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
private static final int STREAM_SHARD_COUNT = 2;
protected static final long CYCLE_PADDING_MS = 100;
+ private static final int LONG_DURATION_SUPERVISOR_MILLIS = 600 * 1000;
private static final String QUERIES_FILE =
"/stream/queries/stream_index_queries.json";
private static final String SUPERVISOR_SPEC_TEMPLATE_FILE =
"supervisor_spec_template.json";
@@ -82,6 +84,9 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
private static final String
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE =
"supervisor_with_idle_behaviour_enabled_spec_template.json";
+ private static final String SUPERVISOR_LONG_DURATION_TEMPLATE_FILE =
+ "supervisor_with_long_duration.json";
+
protected static final String DATA_RESOURCE_ROOT = "/stream/data";
protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
@@ -90,6 +95,9 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
protected static final String
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT,
SUPERVISOR_WITH_IDLE_BEHAVIOUR_ENABLED_SPEC_TEMPLATE_FILE);
+ protected static final String SUPERVISOR_WITH_LONG_DURATION_TEMPLATE_PATH =
+ String.join("/", DATA_RESOURCE_ROOT,
SUPERVISOR_LONG_DURATION_TEMPLATE_FILE);
+
protected static final String SERIALIZER_SPEC_DIR = "serializer";
protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser";
@@ -230,6 +238,113 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
}
}
+ protected void doTestIndexDataHandoffEarly(
+ @Nullable Boolean transactionEnabled
+ ) throws Exception
+ {
+ final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
+ INPUT_FORMAT,
+ getResourceAsString(JSON_INPUT_FORMAT_PATH)
+ );
+ try (
+ final Closeable closer = createResourceCloser(generatedTestConfig);
+ final StreamEventWriter streamEventWriter =
createStreamEventWriter(config, transactionEnabled)
+ ) {
+ final String taskSpec =
generatedTestConfig.getStreamIngestionPropsTransform()
+
.apply(getResourceAsString(SUPERVISOR_WITH_LONG_DURATION_TEMPLATE_PATH));
+ LOG.info("supervisorSpec: [%s]\n", taskSpec);
+ // Start supervisor
+ generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+ LOG.info("Submitted supervisor");
+
+ // Start generating half of the data
+ int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
+ int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
+ secondsToGenerateRemaining = secondsToGenerateRemaining -
secondsToGenerateFirstRound;
+ final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(
+ new JsonEventSerializer(jsonMapper),
+ EVENTS_PER_SECOND,
+ CYCLE_PADDING_MS
+ );
+ long numWritten = streamGenerator.run(
+ generatedTestConfig.getStreamName(),
+ streamEventWriter,
+ secondsToGenerateFirstRound,
+ FIRST_EVENT_TIME
+ );
+
+ // Make sure we consume the data written
+ long numWrittenHalf = numWritten;
+ ITRetryUtil.retryUntilTrue(
+ () ->
+ numWrittenHalf == this.queryHelper.countRows(
+ generatedTestConfig.getFullDatasourceName(),
+ Intervals.ETERNITY,
+ name -> new LongSumAggregatorFactory(name, "count")
+ ),
+ StringUtils.format(
+ "dataSource[%s] consumed [%,d] events, expected [%,d]",
+ generatedTestConfig.getFullDatasourceName(),
+ this.queryHelper.countRows(
+ generatedTestConfig.getFullDatasourceName(),
+ Intervals.ETERNITY,
+ name -> new LongSumAggregatorFactory(name, "count")
+ ),
+ numWritten
+ )
+ );
+
+ // Trigger early handoff
+ StatusResponseHolder response = indexer.handoffTaskGroupEarly(
+ generatedTestConfig.getFullDatasourceName(),
+ jsonMapper.writeValueAsString(
+ ImmutableMap.of(
+ "taskGroupIds", ImmutableList.of(0)
+ )
+ )
+ );
+ Assert.assertEquals(response.getStatus().getCode(), 200);
+
+ // Load the rest of the data
+ numWritten += streamGenerator.run(
+ generatedTestConfig.getStreamName(),
+ streamEventWriter,
+ secondsToGenerateRemaining,
+ FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
+ );
+
+ // Make sure we consume the rest of the data
+ long numWrittenAll = numWritten;
+ ITRetryUtil.retryUntilTrue(
+ () ->
+ numWrittenAll == this.queryHelper.countRows(
+ generatedTestConfig.getFullDatasourceName(),
+ Intervals.ETERNITY,
+ name -> new LongSumAggregatorFactory(name, "count")
+ ),
+ StringUtils.format(
+ "dataSource[%s] consumed [%,d] events, expected [%,d]",
+ generatedTestConfig.getFullDatasourceName(),
+ this.queryHelper.countRows(
+ generatedTestConfig.getFullDatasourceName(),
+ Intervals.ETERNITY,
+ name -> new LongSumAggregatorFactory(name, "count")
+ ),
+ numWritten
+ )
+ );
+
+ // Wait for the early handoff task to complete and cheeck its duration
+ ITRetryUtil.retryUntilTrue(
+ () ->
(!indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName()).isEmpty()),
+ "Waiting for Task Completion"
+ );
+
+ List<TaskResponseObject> completedTasks =
indexer.getCompleteTasksForDataSource(generatedTestConfig.getFullDatasourceName());
+ Assert.assertEquals(completedTasks.stream().filter(taskResponseObject ->
taskResponseObject.getDuration() < LONG_DURATION_SUPERVISOR_MILLIS).count(), 1);
+ }
+ }
+
void doTestIndexDataWithLosingCoordinator(@Nullable Boolean
transactionEnabled) throws Exception
{
testIndexWithLosingNodeHelper(
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
new file mode 100644
index 00000000000..e729680159d
--- /dev/null
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.indexer;
+
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.tests.TestNGGroup;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+@Test(groups = TestNGGroup.KAFKA_INDEX)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITKafkaIndexingServiceStopTasksEarlyTest extends
AbstractKafkaIndexingServiceTest
+{
+ @Override
+ public String getTestNamePrefix()
+ {
+ return "kafka_stop_tasks_early";
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception
+ {
+ doBeforeClass();
+ }
+
+
+ // This test does not actually check whether the tasks stopped early since
the API does
+ // not make any guarantees about handoff. Instead it makes sure the handoff
API can be called
+ // and that the tasks will eventually catch up with new data.
+ @Test
+ public void testStopTasksEarly() throws Exception
+ {
+ doTestIndexDataHandoffEarly(false);
+ }
+}
diff --git
a/integration-tests/src/test/resources/stream/data/supervisor_with_long_duration.json
b/integration-tests/src/test/resources/stream/data/supervisor_with_long_duration.json
new file mode 100644
index 00000000000..180b5a5e24c
--- /dev/null
+++
b/integration-tests/src/test/resources/stream/data/supervisor_with_long_duration.json
@@ -0,0 +1,57 @@
+{
+ "type": "%%STREAM_TYPE%%",
+ "dataSchema": {
+ "dataSource": "%%DATASOURCE%%",
+ "parser": %%PARSER%%,
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": %%DIMENSIONS%%,
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ },
+ "metricsSpec": [
+ {
+ "type": "count",
+ "name": "count"
+ },
+ {
+ "type": "doubleSum",
+ "name": "added",
+ "fieldName": "added"
+ },
+ {
+ "type": "doubleSum",
+ "name": "deleted",
+ "fieldName": "deleted"
+ },
+ {
+ "type": "doubleSum",
+ "name": "delta",
+ "fieldName": "delta"
+ }
+ ],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "MINUTE",
+ "queryGranularity": "NONE"
+ }
+ },
+ "tuningConfig": {
+ "type": "%%STREAM_TYPE%%",
+ "intermediatePersistPeriod": "PT30S",
+ "maxRowsPerSegment": 5000000,
+ "maxRowsInMemory": 500000
+ },
+ "ioConfig": {
+ "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
+ "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
+ "taskCount": 1,
+ "replicas": 1,
+ "taskDuration": "PT600S",
+ "%%USE_EARLIEST_KEY%%": true,
+ "inputFormat" : %%INPUT_FORMAT%%
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]