This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new bdb8f22c054 Migrate Kafka ITs to embedded tests (#18870)
bdb8f22c054 is described below
commit bdb8f22c054cfd5fbb0d11c942cd41f4ba84a48e
Author: Kashif Faraz <[email protected]>
AuthorDate: Sun Jan 4 14:35:21 2026 +0530
Migrate Kafka ITs to embedded tests (#18870)
Changes:
- Add `KafkaResource.produceRecordsWithoutTransaction()`
- Add `KafkaFaultToleranceTest`
- Add `FaultyClusterTest` to replace `ITFaultyClusterTest`
- Fix up `FaultyTaskLockbox`, `FaultyTaskActionClientFactory` on Indexers
- Allow deseriazation of `KafkaSupervisorReportPayload`
- Add `KafkaMultiSupervisorTest`
- Add new methods to `EmbeddedKafkaSupervisorTest`
- Remove the following IT groups from GHA workflow
- `kafka-index`
- `kafka-index-slow`
- `kafka-transactional-index`
- `kafka-transactional-index-slow`
---
.github/workflows/cron-job-its.yml | 28 ---
.github/workflows/standard-its.yml | 31 ---
docs/api-reference/supervisor-api.md | 30 ++-
.../embedded/indexing/KafkaFaultToleranceTest.java | 179 +++++++++++++++++
.../testing/embedded/indexing/KafkaTestBase.java | 148 ++++++++++++++
.../testing/embedded/server/FaultyClusterTest.java | 159 +++++++++++++++
.../embedded/server/KafkaMultiSupervisorTest.java | 95 +++++++++
.../supervisor/KafkaSupervisorReportPayload.java | 37 ++--
.../simulate/EmbeddedKafkaSupervisorTest.java | 100 ++++++++-
.../indexing/kafka/simulate/KafkaResource.java | 42 ++++
.../supervisor/KafkaSupervisorSpecBuilder.java | 10 +-
.../apache/druid/guice/ClusterTestingModule.java | 3 +
.../cluster/overlord/FaultyTaskLockbox.java | 31 ++-
.../task/FaultyRemoteTaskActionClientFactory.java | 15 +-
.../druid/indexing/overlord/GlobalTaskLockbox.java | 13 +-
.../overlord/supervisor/SupervisorResource.java | 4 +-
.../tools/WikipediaStreamEventStreamGenerator.java | 11 +-
integration-tests/pom.xml | 16 --
.../druid/testing/utils/KafkaAdminClient.java | 111 ----------
.../java/org/apache/druid/tests/TestNGGroup.java | 8 -
.../coordinator/duty/ITFaultyClusterTest.java | 196 ------------------
.../indexer/AbstractKafkaIndexingServiceTest.java | 219 --------------------
...exingServiceNonTransactionalSerializedTest.java | 70 -------
.../ITKafkaIndexingServiceStopTasksEarlyTest.java | 53 -----
...IndexingServiceTransactionalSerializedTest.java | 70 -------
.../druid/tests/indexer/ITNilColumnTest.java | 223 ---------------------
...ingServiceNonTransactionalParallelizedTest.java | 103 ----------
...dexingServiceTransactionalParallelizedTest.java | 77 -------
.../overlord/supervisor/SupervisorReport.java | 8 +-
.../testing/embedded/EmbeddedClusterApis.java | 14 +-
.../testing/embedded/EmbeddedServiceClient.java | 3 +-
31 files changed, 844 insertions(+), 1263 deletions(-)
diff --git a/.github/workflows/cron-job-its.yml
b/.github/workflows/cron-job-its.yml
index d26ece294c9..1fb0c7a4fe7 100644
--- a/.github/workflows/cron-job-its.yml
+++ b/.github/workflows/cron-job-its.yml
@@ -56,34 +56,6 @@ jobs:
run: |
./it.sh ci
- integration-index-tests-middleManager:
- strategy:
- fail-fast: false
- matrix:
- testing_group: [kafka-index, kafka-index-slow,
kafka-transactional-index, kafka-transactional-index-slow, kafka-data-format,
realtime-index]
- uses: ./.github/workflows/reusable-standard-its.yml
- needs: build
- with:
- build_jdk: 17
- runtime_jdk: 17
- testing_groups: -Dgroups=${{ matrix.testing_group }}
- use_indexer: middleManager
- group: ${{ matrix.testing_group }}
-
- integration-index-tests-indexer:
- strategy:
- fail-fast: false
- matrix:
- testing_group: [ kafka-index, kafka-transactional-index,
kafka-index-slow, kafka-transactional-index-slow, kafka-data-format ]
- uses: ./.github/workflows/reusable-standard-its.yml
- needs: build
- with:
- build_jdk: 17
- runtime_jdk: 17
- testing_groups: -Dgroups=${{ matrix.testing_group }}
- use_indexer: indexer
- group: ${{ matrix.testing_group }}
-
integration-query-tests-middleManager:
strategy:
fail-fast: false
diff --git a/.github/workflows/standard-its.yml
b/.github/workflows/standard-its.yml
index e195572b6fb..fba85ac0055 100644
--- a/.github/workflows/standard-its.yml
+++ b/.github/workflows/standard-its.yml
@@ -42,37 +42,6 @@ jobs:
core:
- '!extension*/**'
- integration-index-tests-middleManager:
- needs: changes
- strategy:
- fail-fast: false
- matrix:
- testing_group: [kafka-index, kafka-index-slow,
kafka-transactional-index, kafka-transactional-index-slow, realtime-index]
- uses: ./.github/workflows/reusable-standard-its.yml
- if: ${{ needs.changes.outputs.core == 'true' ||
needs.changes.outputs.common-extensions == 'true' }}
- with:
- build_jdk: 17
- runtime_jdk: 17
- testing_groups: -Dgroups=${{ matrix.testing_group }}
- override_config_path: ./environment-configs/test-groups/prepopulated-data
- use_indexer: middleManager
- group: ${{ matrix.testing_group }}
-
- integration-index-tests-indexer:
- needs: changes
- strategy:
- fail-fast: false
- matrix:
- testing_group: [kafka-index]
- uses: ./.github/workflows/reusable-standard-its.yml
- if: ${{ needs.changes.outputs.core == 'true' ||
needs.changes.outputs.common-extensions == 'true' }}
- with:
- build_jdk: 17
- runtime_jdk: 17
- testing_groups: -Dgroups=${{ matrix.testing_group }}
- use_indexer: indexer
- group: ${{ matrix.testing_group }}
-
integration-query-tests-middleManager:
needs: changes
strategy:
diff --git a/docs/api-reference/supervisor-api.md
b/docs/api-reference/supervisor-api.md
index 38e68d4e137..87d7ea65e4e 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -3598,12 +3598,33 @@ Host: http://ROUTER_IP:ROUTER_PORT
### Handoff task groups for a supervisor early
-Trigger handoff for specified task groups of a supervisor early. This is a
best effort API and makes no guarantees of handoff execution
+Trigger handoff for specified task groups of a supervisor early. This is a
best effort API and makes no guarantees of handoff execution.
#### URL
`POST` `/druid/indexer/v1/supervisor/{supervisorId}/taskGroups/handoff`
+#### Responses
+
+<Tabs>
+
+<TabItem value="1" label="202 ACCEPTED">
+
+*Request has been accepted and handoff will be initiated in the background.*
+
+</TabItem>
+<TabItem value="2" label="404 NOT FOUND">
+
+*Invalid supervisor ID or the supervisor is not running.*
+
+</TabItem>
+<TabItem value="3" label="400 BAD REQUEST">
+
+*Supervisor does not support early handoff.*
+
+</TabItem>
+</Tabs>
+
#### Sample request
The following example shows how to handoff task groups for a supervisor with
the name `social_media` and has the task groups: `1,2,3`.
@@ -3639,8 +3660,11 @@ Content-Type: application/json
#### Sample response
<details>
- <summary>View the response</summary>
-(empty response)
+ <summary>202 Accepted</summary>
+
+```json
+{}
+```
</details>
### Shut down a supervisor
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaFaultToleranceTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaFaultToleranceTest.java
new file mode 100644
index 00000000000..abb2993098c
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaFaultToleranceTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.rpc.RequestBuilder;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class KafkaFaultToleranceTest extends KafkaTestBase
+{
+ private SupervisorSpec supervisorSpec = null;
+ private String topic = null;
+ private int totalRecords = 0;
+
+ @BeforeEach
+ public void setupTopicAndSupervisor()
+ {
+ totalRecords = 0;
+ topic = "topic_" + dataSource;
+ kafkaServer.createTopicWithPartitions(topic, 2);
+
+ supervisorSpec = createSupervisor().withId("supe_" +
dataSource).build(dataSource, topic);
+ cluster.callApi().postSupervisor(supervisorSpec);
+ }
+
+ @AfterEach
+ public void verifyAndTearDown()
+ {
+ waitUntilPublishedRecordsAreIngested(totalRecords);
+ verifySupervisorIsRunningHealthy(supervisorSpec.getId());
+ cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+ kafkaServer.deleteTopic(topic);
+ verifyRowCount(totalRecords);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void test_supervisorRecovers_afterOverlordRestart(boolean
useTransactions) throws Exception
+ {
+ totalRecords = publish1kRecords(topic, useTransactions);
+ waitUntilPublishedRecordsAreIngested(totalRecords);
+
+ overlord.stop();
+ totalRecords += publish1kRecords(topic, useTransactions);
+
+ overlord.start();
+ totalRecords += publish1kRecords(topic, useTransactions);
+ }
+
+ @Test
+ public void test_supervisorRecovers_afterCoordinatorRestart() throws
Exception
+ {
+ final boolean useTransactions = true;
+ totalRecords = publish1kRecords(topic, useTransactions);
+ waitUntilPublishedRecordsAreIngested(totalRecords);
+
+ coordinator.stop();
+ totalRecords += publish1kRecords(topic, useTransactions);
+
+ coordinator.start();
+ totalRecords += publish1kRecords(topic, useTransactions);
+ }
+
+ @Test
+ public void test_supervisorRecovers_afterHistoricalRestart() throws Exception
+ {
+ final boolean useTransactions = false;
+ totalRecords = publish1kRecords(topic, useTransactions);
+ waitUntilPublishedRecordsAreIngested(totalRecords);
+
+ historical.stop();
+ totalRecords += publish1kRecords(topic, useTransactions);
+
+ historical.start();
+ totalRecords += publish1kRecords(topic, useTransactions);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void test_supervisorRecovers_afterSuspendResume(boolean
useTransactions)
+ {
+ totalRecords = publish1kRecords(topic, useTransactions);
+ waitUntilPublishedRecordsAreIngested(totalRecords);
+
+ cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+ totalRecords += publish1kRecords(topic, useTransactions);
+
+ cluster.callApi().postSupervisor(supervisorSpec.createRunningSpec());
+ totalRecords += publish1kRecords(topic, useTransactions);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void test_supervisorRecovers_afterChangeInTopicPartitions(boolean
useTransactions)
+ {
+ totalRecords = publish1kRecords(topic, useTransactions);
+
+ kafkaServer.increasePartitionsInTopic(topic, 4);
+ totalRecords += publish1kRecords(topic, useTransactions);
+ }
+
+ @Test
+ public void test_supervisorLaunchesNewTask_ifEarlyHandoff()
+ {
+ final boolean useTransactions = true;
+ totalRecords = publish1kRecords(topic, useTransactions);
+
+ waitUntilPublishedRecordsAreIngested(totalRecords);
+
+ final Set<String> taskIdsBeforeHandoff = getRunningTaskIds(dataSource);
+ Assertions.assertFalse(taskIdsBeforeHandoff.isEmpty());
+
+ final String path = StringUtils.format(
+ "/druid/indexer/v1/supervisor/%s/taskGroups/handoff",
+ supervisorSpec.getId()
+ );
+ cluster.callApi().serviceClient().onLeaderOverlord(
+ mapper -> new RequestBuilder(HttpMethod.POST, path)
+ .jsonContent(mapper, Map.of("taskGroupIds", List.of(0, 1))),
+ new TypeReference<>() {}
+ );
+
+ // Wait for the handoff notice to be processed
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("ingest/notices/time")
+ .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisorSpec.getId())
+ .hasDimension("noticeType", "handoff_task_group_notice")
+ );
+
+ totalRecords += publish1kRecords(topic, useTransactions);
+ waitUntilPublishedRecordsAreIngested(totalRecords);
+
+ // Verify that the running task IDs have changed
+ final Set<String> taskIdsAfterHandoff = getRunningTaskIds(dataSource);
+ Assertions.assertFalse(taskIdsAfterHandoff.isEmpty());
+
Assertions.assertFalse(taskIdsBeforeHandoff.stream().anyMatch(taskIdsAfterHandoff::contains));
+ }
+
+ private Set<String> getRunningTaskIds(String dataSource)
+ {
+ return cluster.callApi()
+ .getTasks(dataSource, "running")
+ .stream()
+ .map(TaskStatusPlus::getId)
+ .collect(Collectors.toSet());
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaTestBase.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaTestBase.java
new file mode 100644
index 00000000000..3dd9efb4a9e
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaTestBase.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.testing.tools.EventSerializer;
+import org.apache.druid.testing.tools.JsonEventSerializer;
+import org.apache.druid.testing.tools.StreamGenerator;
+import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public abstract class KafkaTestBase extends EmbeddedClusterTestBase
+{
+ protected final KafkaResource kafkaServer = new KafkaResource();
+ protected final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ protected final EmbeddedIndexer indexer = new EmbeddedIndexer();
+ protected final EmbeddedBroker broker = new EmbeddedBroker();
+ protected final EmbeddedHistorical historical = new EmbeddedHistorical();
+ protected final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+
+ return EmbeddedDruidCluster
+ .withEmbeddedDerbyAndZookeeper()
+ .useContainerFriendlyHostname()
+ .addExtension(KafkaIndexTaskModule.class)
+ .useLatchableEmitter()
+ .useDefaultTimeoutForLatchableEmitter(60)
+ .addResource(kafkaServer)
+ .addServer(indexer)
+ .addServer(coordinator)
+ .addServer(overlord)
+ .addServer(broker)
+ .addServer(historical)
+ .addServer(new EmbeddedRouter());
+ }
+
+ protected KafkaSupervisorSpecBuilder createSupervisor()
+ {
+ return MoreResources.Supervisor.KAFKA_JSON
+ .get()
+ .withDataSchema(schema -> schema.withTimestamp(new
TimestampSpec("timestamp", "iso", null)))
+ .withTuningConfig(tuningConfig ->
tuningConfig.withMaxRowsPerSegment(1))
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withTaskCount(2)
+ );
+ }
+
+ /**
+ * Waits until number of processed events matches {@code expectedRowCount}.
+ */
+ protected void waitUntilPublishedRecordsAreIngested(int expectedRowCount)
+ {
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/events/processed")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+ agg -> agg.hasSumAtLeast(expectedRowCount)
+ );
+
+ final int totalEventsProcessed = indexer
+ .latchableEmitter()
+ .getMetricValues("ingest/events/processed",
Map.of(DruidMetrics.DATASOURCE, dataSource))
+ .stream()
+ .mapToInt(Number::intValue)
+ .sum();
+ Assertions.assertEquals(expectedRowCount, totalEventsProcessed);
+ }
+
+ protected void verifySupervisorIsRunningHealthy(String supervisorId)
+ {
+ final SupervisorStatus status =
cluster.callApi().getSupervisorStatus(supervisorId);
+ Assertions.assertTrue(status.isHealthy());
+ Assertions.assertFalse(status.isSuspended());
+ Assertions.assertEquals("RUNNING", status.getState());
+ }
+
+ /**
+ * Verifies that the row count in {@link #dataSource} matches the {@code
expectedRowCount}.
+ */
+ protected void verifyRowCount(int expectedRowCount)
+ {
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+ cluster.callApi().verifySqlQuery(
+ "SELECT COUNT(*) FROM %s",
+ dataSource,
+ String.valueOf(expectedRowCount)
+ );
+ }
+
+ protected int publish1kRecords(String topic, boolean useTransactions)
+ {
+ final EventSerializer serializer = new
JsonEventSerializer(overlord.bindings().jsonMapper());
+ final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(serializer, 100, 100);
+ List<byte[]> records = streamGenerator.generateEvents(10);
+
+ ArrayList<ProducerRecord<byte[], byte[]>> producerRecords = new
ArrayList<>();
+ for (byte[] record : records) {
+ producerRecords.add(new ProducerRecord<>(topic, record));
+ }
+
+ if (useTransactions) {
+ kafkaServer.produceRecordsToTopic(producerRecords);
+ } else {
+ kafkaServer.produceRecordsWithoutTransaction(producerRecords);
+ }
+ return producerRecords.size();
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/FaultyClusterTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/FaultyClusterTest.java
new file mode 100644
index 00000000000..a1a1622f535
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/FaultyClusterTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.guice.ClusterTestingModule;
+import org.apache.druid.indexer.TaskStatusPlus;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorReportPayload;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.metadata.PendingSegmentRecord;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.testing.cluster.overlord.FaultyLagAggregator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.indexing.KafkaTestBase;
+import org.hamcrest.Matchers;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.Period;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Integration test to verify induction of various faults in the cluster
+ * using {@code ClusterTestingTaskConfig}.
+ * <p>
+ * Future tests can try to leverage the cluster testing config to verify
cluster
+ * scalability and stability.
+ */
+public class FaultyClusterTest extends KafkaTestBase
+{
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ return super
+ .createCluster()
+ .addExtension(ClusterTestingModule.class)
+ .addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
+ .addCommonProperty("druid.unsafe.cluster.testing", "true");
+ }
+
+ @Test
+ public void test_overlord_skipsCleanupOfPendingSegments()
+ {
+ final Map<String, Object> taskContext = Map.of(
+ "clusterTesting",
+ Map.of("metadataConfig", Map.of("cleanupPendingSegments", false))
+ );
+
+ // Set up the topic and supervisor
+ final String topic = IdUtils.getRandomId();
+ kafkaServer.createTopicWithPartitions(topic, 1);
+ final KafkaSupervisorSpec supervisorSpec = createSupervisor()
+ .withIoConfig(io -> io.withTaskCount(1))
+ .withContext(taskContext)
+ .withId("supe_" + dataSource)
+ .build(dataSource, topic);
+ cluster.callApi().postSupervisor(supervisorSpec);
+
+ final int recordCount = publish1kRecords(topic, true);
+ waitUntilPublishedRecordsAreIngested(recordCount);
+
+ cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+ kafkaServer.deleteTopic(topic);
+
+ verifyRowCount(recordCount);
+
+ // Verify that pending segments are not cleaned up
+ final List<PendingSegmentRecord> pendingSegments = overlord
+ .bindings()
+ .segmentsMetadataStorage()
+ .getPendingSegments(dataSource, Intervals.ETERNITY);
+ Assertions.assertFalse(pendingSegments.isEmpty());
+ }
+
+ @Test
+ @Timeout(60)
+ public void test_supervisor_reportsMagnifiedLag()
+ {
+ // Set up the topic and supervisor
+ final String topic = IdUtils.getRandomId();
+ kafkaServer.createTopicWithPartitions(topic, 2);
+
+ final int lagMultiplier = 1_000_000;
+ final KafkaSupervisorSpec supervisorSpec = createSupervisor()
+ .withIoConfig(
+ io -> io
+ .withTaskCount(2)
+ .withLagAggregator(new FaultyLagAggregator(lagMultiplier))
+ )
+ .withContext(
+ // Delay segment allocation so that tasks are not able to ingest
anything
+ Map.of(
+ "clusterTesting",
+ Map.of("taskActionClientConfig",
Map.of("segmentAllocateDelay", "P1D"))
+ )
+ )
+ .withTuningConfig(t -> t.withOffsetFetchPeriod(Period.millis(10)))
+ .withId("supe_" + dataSource)
+ .build(dataSource, topic);
+
+ cluster.callApi().postSupervisor(supervisorSpec);
+
+ // Publish records to build up some lag
+ final int totalPublishedRecords = publish1kRecords(topic, true);
+
+ // Wait for supervisor to report the expected lag
+ final long expectedLag = (long) totalPublishedRecords * lagMultiplier;
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("ingest/kafka/lag")
+ .hasDimension(DruidMetrics.SUPERVISOR_ID,
supervisorSpec.getId())
+
.hasValueMatching(Matchers.greaterThanOrEqualTo(expectedLag))
+ );
+
+ final String path =
StringUtils.format("/druid/indexer/v1/supervisor/%s/status",
supervisorSpec.getId());
+ final SupervisorReport<KafkaSupervisorReportPayload> report =
cluster.callApi().serviceClient().onLeaderOverlord(
+ mapper -> new RequestBuilder(HttpMethod.GET, path),
+ new TypeReference<>() {}
+ );
+
+ Assertions.assertNotNull(report);
+ final KafkaSupervisorReportPayload payload = report.getPayload();
+ Assertions.assertNotNull(payload);
+
+ Assertions.assertFalse(payload.isSuspended());
+ Assertions.assertTrue(payload.getAggregateLag() >= expectedLag);
+
+ // Kill ongoing tasks and suspend the supervisor
+ for (TaskStatusPlus task : cluster.callApi().getTasks(dataSource,
"running")) {
+ cluster.callApi().onLeaderOverlord(o -> o.cancelTask(task.getId()));
+ }
+ cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+ kafkaServer.deleteTopic(topic);
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/KafkaMultiSupervisorTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/KafkaMultiSupervisorTest.java
new file mode 100644
index 00000000000..8d2123a52b3
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/KafkaMultiSupervisorTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.testing.embedded.indexing.KafkaTestBase;
+import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.List;
+
+/**
+ * Contains miscellaneous tests for Kafka supervisors.
+ */
+public class KafkaMultiSupervisorTest extends KafkaTestBase
+{
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void test_ingestIntoSingleDatasource_fromDifferentTopics(boolean
useTransactions)
+ {
+ final List<DimensionSchema> dimensionsSpec =
DimensionsSpec.getDefaultSchemas(
+ List.of("kafka.topic",
WikipediaStreamEventStreamGenerator.COL_UNIQUE_NAMESPACE)
+ );
+
+ // Set up first topic and supervisor
+ final String topic1 = IdUtils.getRandomId();
+ kafkaServer.createTopicWithPartitions(topic1, 1);
+
+ final KafkaSupervisorSpec supervisor1 = createSupervisor()
+ .withIoConfig(io -> io.withKafkaInputFormat(new JsonInputFormat(null,
null, null, null, null)))
+ .withDataSchema(d -> d.withDimensions(dimensionsSpec))
+ .withId(topic1)
+ .build(dataSource, topic1);
+ cluster.callApi().postSupervisor(supervisor1);
+
+ // Set up another topic and supervisor
+ final String topic2 = IdUtils.getRandomId();
+ kafkaServer.createTopicWithPartitions(topic2, 1);
+
+ final KafkaSupervisorSpec supervisor2 = createSupervisor()
+ .withIoConfig(io -> io.withKafkaInputFormat(new JsonInputFormat(null,
null, null, null, null)))
+ .withDataSchema(d -> d.withDimensions(dimensionsSpec))
+ .withId(topic2)
+ .build(dataSource, topic2);
+ cluster.callApi().postSupervisor(supervisor2);
+
+ // Produce records to both topics
+ final int rowCountTopic1 = publish1kRecords(topic1, useTransactions)
+ + publish1kRecords(topic1, useTransactions);
+ final int rowCountTopic2 = publish1kRecords(topic2, useTransactions);
+
+ final int totalRowCount = rowCountTopic1 + rowCountTopic2;
+ waitUntilPublishedRecordsAreIngested(totalRowCount);
+
+ // Tear down both topics and supervisors
+ kafkaServer.deleteTopic(topic1);
+ cluster.callApi().postSupervisor(supervisor1.createSuspendedSpec());
+ kafkaServer.deleteTopic(topic2);
+ cluster.callApi().postSupervisor(supervisor2.createSuspendedSpec());
+
+ // Verify total row count and row count for each topic
+ verifyRowCount(totalRowCount);
+ Assertions.assertEquals(
+ "2000",
+ cluster.runSql("SELECT COUNT(*) FROM %s WHERE \"kafka.topic\" = '%s'",
dataSource, topic1)
+ );
+ Assertions.assertEquals(
+ "1000",
+ cluster.runSql("SELECT COUNT(*) FROM %s WHERE \"kafka.topic\" = '%s'",
dataSource, topic2)
+ );
+ }
+}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
index 897d1655ab8..589e42e6175 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorReportPayload.java
@@ -19,6 +19,8 @@
package org.apache.druid.indexing.kafka.supervisor;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
@@ -30,29 +32,30 @@ import java.util.Map;
public class KafkaSupervisorReportPayload extends
SeekableStreamSupervisorReportPayload<KafkaTopicPartition, Long>
{
+ @JsonCreator
public KafkaSupervisorReportPayload(
- String id,
- String dataSource,
- String topic,
- int partitions,
- int replicas,
- long durationSeconds,
- @Nullable Map<KafkaTopicPartition, Long> latestOffsets,
- @Nullable Map<KafkaTopicPartition, Long> minimumLag,
- @Nullable Map<KafkaTopicPartition, Long> minimumLagMillis,
- @Nullable Long aggregateLag,
- @Nullable DateTime offsetsLastUpdated,
- boolean suspended,
- boolean healthy,
- SupervisorStateManager.State state,
- SupervisorStateManager.State detailedState,
- List<SupervisorStateManager.ExceptionEvent> recentErrors
+ @JsonProperty("id") String id,
+ @JsonProperty("dataSource") String dataSource,
+ @JsonProperty("stream") String stream,
+ @JsonProperty("partitions") int partitions,
+ @JsonProperty("replicas") int replicas,
+ @JsonProperty("durationSeconds") long durationSeconds,
+ @JsonProperty("latestOffsets") @Nullable Map<KafkaTopicPartition, Long>
latestOffsets,
+ @JsonProperty("minimumLag") @Nullable Map<KafkaTopicPartition, Long>
minimumLag,
+ @JsonProperty("minimumLagMillis") @Nullable Map<KafkaTopicPartition,
Long> minimumLagMillis,
+ @JsonProperty("aggregateLag") @Nullable Long aggregateLag,
+ @JsonProperty("offsetsLastUpdated") @Nullable DateTime
offsetsLastUpdated,
+ @JsonProperty("suspended") boolean suspended,
+ @JsonProperty("healthy") boolean healthy,
+ @JsonProperty("ignored1") SupervisorStateManager.State state, // No
serializer for State class
+ @JsonProperty("ignored2") SupervisorStateManager.State detailedState,
+ @JsonProperty("recentErrors")
List<SupervisorStateManager.ExceptionEvent> recentErrors
)
{
super(
id,
dataSource,
- topic,
+ stream,
partitions,
replicas,
durationSeconds,
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
index c2517331035..6f0fa52317d 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@@ -45,6 +46,7 @@ import
org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.joda.time.DateTime;
import org.joda.time.Interval;
+import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -55,10 +57,14 @@ import java.util.concurrent.ThreadLocalRandom;
public class EmbeddedKafkaSupervisorTest extends EmbeddedClusterTestBase
{
+ private static final String COL_ITEM = "item";
+ private static final String COL_TIMESTAMP = "timestamp";
+
private final EmbeddedBroker broker = new EmbeddedBroker();
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
private final EmbeddedHistorical historical = new EmbeddedHistorical();
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
private KafkaResource kafkaServer;
@Override
@@ -72,7 +78,7 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
cluster.addExtension(KafkaIndexTaskModule.class)
.addResource(kafkaServer)
.useLatchableEmitter()
- .addServer(new EmbeddedCoordinator())
+ .addServer(coordinator)
.addServer(overlord)
.addServer(indexer)
.addServer(historical)
@@ -94,7 +100,8 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
// Submit and start a supervisor
final String supervisorId = dataSource + "_supe";
- final KafkaSupervisorSpec kafkaSupervisorSpec =
createKafkaSupervisor(supervisorId, topic);
+ final KafkaSupervisorSpec kafkaSupervisorSpec
+ = newKafkaSupervisor().withId(supervisorId).build(dataSource, topic);
Assertions.assertEquals(supervisorId,
cluster.callApi().postSupervisor(kafkaSupervisorSpec));
@@ -142,12 +149,87 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
Assertions.assertEquals(0, lockedIntervals.size());
}
- private KafkaSupervisorSpec createKafkaSupervisor(String supervisorId,
String topic)
+ @Test
+ public void test_supervisorBecomesIdle_ifTopicHasNoData()
+ {
+ final String topic = IdUtils.getRandomId();
+ kafkaServer.createTopicWithPartitions(topic, 2);
+
+ final long idleAfterMillis = 100L;
+ final KafkaSupervisorSpec supervisorSpec = newKafkaSupervisor()
+ .withIoConfig(ioConfig -> ioConfig.withIdleConfig(new IdleConfig(true,
idleAfterMillis)).withTaskCount(1))
+ .build(dataSource, topic);
+ cluster.callApi().postSupervisor(supervisorSpec);
+
+ // Wait for the first set of tasks to finish
+ overlord.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("task/run/time")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+ );
+
+ // Verify that the supervisor is now idle
+ final SupervisorStatus status =
cluster.callApi().getSupervisorStatus(supervisorSpec.getId());
+ Assertions.assertFalse(status.isSuspended());
+ Assertions.assertTrue(status.isHealthy());
+ Assertions.assertEquals("IDLE", status.getState());
+
+ cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+ kafkaServer.deleteTopic(topic);
+ }
+
+ @Test
+ public void test_runSupervisor_withEmptyDimension()
+ {
+ final String topic = IdUtils.getRandomId();
+ kafkaServer.createTopicWithPartitions(topic, 2);
+
+ final String emptyColumn = "unknownColumn";
+ final KafkaSupervisorSpec supervisorSpec = newKafkaSupervisor()
+ .withDataSchema(
+ s -> s.withDimensions(
+ DimensionsSpec.getDefaultSchemas(List.of(emptyColumn,
COL_ITEM))
+ )
+ )
+ .build(dataSource, topic);
+ cluster.callApi().postSupervisor(supervisorSpec);
+
+ final int numRows = 100;
+ kafkaServer.produceRecordsToTopic(generateRecordsForTopic(topic, numRows,
DateTimes.nowUtc()));
+
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/events/processed")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+ agg -> agg.hasSumAtLeast(numRows)
+ );
+
+ cluster.callApi().postSupervisor(supervisorSpec.createSuspendedSpec());
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+
+ Assertions.assertEquals(
+ "100",
+ cluster.runSql("SELECT COUNT(*) FROM %s WHERE %s IS NULL", dataSource,
emptyColumn)
+ );
+ Assertions.assertEquals(
+ "0",
+ cluster.runSql("SELECT COUNT(*) FROM %s WHERE %s IS NOT NULL",
dataSource, emptyColumn)
+ );
+ Assertions.assertEquals(
+ StringUtils.format("%s,YES,VARCHAR", emptyColumn),
+ cluster.runSql(
+ "SELECT COLUMN_NAME, IS_NULLABLE, DATA_TYPE"
+ + " FROM INFORMATION_SCHEMA.COLUMNS"
+ + " WHERE TABLE_NAME = '%s' AND COLUMN_NAME = '%s'",
+ dataSource, emptyColumn
+ )
+ );
+ }
+
+ private KafkaSupervisorSpecBuilder newKafkaSupervisor()
{
return new KafkaSupervisorSpecBuilder()
.withDataSchema(
schema -> schema
- .withTimestamp(new TimestampSpec("timestamp", null, null))
+ .withTimestamp(new TimestampSpec(COL_TIMESTAMP, null, null))
.withDimensions(DimensionsSpec.EMPTY)
)
.withTuningConfig(
@@ -157,12 +239,14 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
)
.withIoConfig(
ioConfig -> ioConfig
- .withInputFormat(new CsvInputFormat(List.of("timestamp",
"item"), null, null, false, 0, false))
+ .withInputFormat(new CsvInputFormat(List.of(COL_TIMESTAMP,
COL_ITEM), null, null, false, 0, false))
.withConsumerProperties(kafkaServer.consumerProperties())
+ .withTaskDuration(Period.millis(500))
+ .withStartDelay(Period.millis(10))
+ .withSupervisorRunPeriod(Period.millis(500))
+ .withCompletionTimeout(Period.seconds(5))
.withUseEarliestSequenceNumber(true)
- )
- .withId(supervisorId)
- .build(dataSource, topic);
+ );
}
private List<ProducerRecord<byte[], byte[]>> generateRecordsForTopic(
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
index 18e3c79503d..0b4bbfb978b 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/KafkaResource.java
@@ -23,8 +23,11 @@ import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.TestcontainerResource;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreatePartitionsResult;
+import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.testcontainers.kafka.KafkaContainer;
@@ -119,6 +122,26 @@ public class KafkaResource extends
TestcontainerResource<KafkaContainer>
}
}
+ /**
+ * Increases the number of partitions in the given Kakfa topic. The topic
must
+ * already exist. This method waits until the increase in the partition count
+ * has started (but not necessarily finished).
+ */
+ public void increasePartitionsInTopic(String topic, int newPartitionCount)
+ {
+ try (Admin admin = newAdminClient()) {
+ final CreatePartitionsResult result = admin.createPartitions(
+ Map.of(topic, NewPartitions.increaseTo(newPartitionCount))
+ );
+
+ // Wait for the partitioning to start
+ result.values().get(topic).get();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public void produceRecordsToTopic(
List<ProducerRecord<byte[], byte[]>> records,
Map<String, Object> extraProducerProperties
@@ -145,6 +168,25 @@ public class KafkaResource extends
TestcontainerResource<KafkaContainer>
produceRecordsToTopic(records, null);
}
+ /**
+ * Produces records to a topic of this embedded Kafka server without using
+ * Kafka transactions.
+ */
+ public void produceRecordsWithoutTransaction(List<ProducerRecord<byte[],
byte[]>> records)
+ {
+ final Map<String, Object> props = producerProperties();
+ props.remove(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer = new
KafkaProducer<>(props)) {
+ for (ProducerRecord<byte[], byte[]> record : records) {
+ kafkaProducer.send(record);
+ }
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public Map<String, Object> producerProperties()
{
final Map<String, Object> props = new HashMap<>(commonClientProperties());
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
index ce36c0b7dc0..4cd8c523185 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka.supervisor;
import org.apache.druid.segment.indexing.DataSchema;
+import java.util.Map;
import java.util.function.Consumer;
/**
@@ -33,6 +34,7 @@ public class KafkaSupervisorSpecBuilder
private final DataSchema.Builder dataSchema = new DataSchema.Builder();
private final KafkaIOConfigBuilder ioConfig = new KafkaIOConfigBuilder();
private final KafkaTuningConfigBuilder tuningConfig = new
KafkaTuningConfigBuilder();
+ private Map<String, Object> context;
public KafkaSupervisorSpecBuilder
withDataSchema(Consumer<DataSchema.Builder> updateDataSchema)
{
@@ -52,6 +54,12 @@ public class KafkaSupervisorSpecBuilder
return this;
}
+ public KafkaSupervisorSpecBuilder withContext(Map<String, Object> context)
+ {
+ this.context = context;
+ return this;
+ }
+
public KafkaSupervisorSpecBuilder withId(String id)
{
this.id = id;
@@ -87,7 +95,7 @@ public class KafkaSupervisorSpecBuilder
dataSchema.build(),
tuningConfig.build(),
ioConfig.build(),
- null,
+ context,
false,
// Jackson injected params, not needed while posting a supervisor to
the Overlord
null, null, null, null, null, null, null, null, null
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
index 4c48d1ae760..22f20772f8f 100644
---
a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java
@@ -116,6 +116,9 @@ public class ClusterTestingModule implements DruidModule
binder.bind(OverlordClient.class)
.to(FaultyOverlordClient.class)
.in(LazySingleton.class);
+ binder.bind(RemoteTaskActionClientFactory.class)
+ .to(FaultyRemoteTaskActionClientFactory.class)
+ .in(LazySingleton.class);
}
}
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
index 67ce9e829ae..d10cf6f9880 100644
---
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java
@@ -40,6 +40,8 @@ public class FaultyTaskLockbox extends GlobalTaskLockbox
private static final Logger log = new Logger(FaultyTaskLockbox.class);
private final ObjectMapper mapper;
+ private final TaskStorage taskStorage;
+ private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
@Inject
public FaultyTaskLockbox(
@@ -50,21 +52,30 @@ public class FaultyTaskLockbox extends GlobalTaskLockbox
{
super(taskStorage, metadataStorageCoordinator);
this.mapper = mapper;
+ this.taskStorage = taskStorage;
+ this.metadataStorageCoordinator = metadataStorageCoordinator;
log.info("Initializing FaultyTaskLockbox.");
}
@Override
- protected void cleanupPendingSegments(Task task)
+ protected TaskLockbox createLockbox(String dataSource)
{
- final ClusterTestingTaskConfig testingConfig = getTestingConfig(task);
- if (testingConfig.getMetadataConfig().isCleanupPendingSegments()) {
- super.cleanupPendingSegments(task);
- } else {
- log.info(
- "Skipping cleanup of pending segments for task[%s] since it has
testing config[%s].",
- task.getId(), testingConfig
- );
- }
+ return new TaskLockbox(dataSource, taskStorage, metadataStorageCoordinator)
+ {
+ @Override
+ protected void cleanupPendingSegments(Task task)
+ {
+ final ClusterTestingTaskConfig testingConfig = getTestingConfig(task);
+ if (testingConfig.getMetadataConfig().isCleanupPendingSegments()) {
+ super.cleanupPendingSegments(task);
+ } else {
+ log.info(
+ "Skipping cleanup of pending segments for task[%s] since it has
testing config[%s].",
+ task.getId(), testingConfig
+ );
+ }
+ }
+ };
}
private ClusterTestingTaskConfig getTestingConfig(Task task)
diff --git
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
index 043a49f6d14..f6115c19928 100644
---
a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
+++
b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/task/FaultyRemoteTaskActionClientFactory.java
@@ -34,15 +34,14 @@ import
org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
public class FaultyRemoteTaskActionClientFactory extends
RemoteTaskActionClientFactory
{
- private final ClusterTestingTaskConfig config;
+ private final ObjectMapper jsonMapper;
@Inject
public FaultyRemoteTaskActionClientFactory(
@Json final ObjectMapper jsonMapper,
@EscalatedGlobal final ServiceClientFactory clientFactory,
@IndexingService final ServiceLocator serviceLocator,
- RetryPolicyConfig retryPolicyConfig,
- ClusterTestingTaskConfig config
+ RetryPolicyConfig retryPolicyConfig
)
{
super(
@@ -51,12 +50,18 @@ public class FaultyRemoteTaskActionClientFactory extends
RemoteTaskActionClientF
serviceLocator,
retryPolicyConfig
);
- this.config = config;
+ this.jsonMapper = jsonMapper;
}
@Override
public TaskActionClient create(Task task)
{
- return new
FaultyRemoteTaskActionClient(config.getTaskActionClientConfig(),
super.create(task));
+ try {
+ final ClusterTestingTaskConfig config =
ClusterTestingTaskConfig.forTask(task, jsonMapper);
+ return new
FaultyRemoteTaskActionClient(config.getTaskActionClientConfig(),
super.create(task));
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/GlobalTaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/GlobalTaskLockbox.java
index a464af86868..8b00739167a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/GlobalTaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/GlobalTaskLockbox.java
@@ -277,14 +277,11 @@ public class GlobalTaskLockbox
}
/**
- * Cleans up pending segments associated with the given task, if any.
+ * Creates a new {@link TaskLockbox} for the given datasource.
*/
- protected void cleanupPendingSegments(Task task)
+ protected TaskLockbox createLockbox(String dataSource)
{
- executeForTask(
- task,
- lockbox -> lockbox.cleanupPendingSegments(task)
- );
+ return new TaskLockbox(dataSource, taskStorage,
metadataStorageCoordinator);
}
/**
@@ -484,9 +481,7 @@ public class GlobalTaskLockbox
(ds, existingResource) -> {
final DatasourceLockboxResource resource =
Objects.requireNonNullElseGet(
existingResource,
- () -> new DatasourceLockboxResource(
- new TaskLockbox(ds, taskStorage, metadataStorageCoordinator)
- )
+ () -> new DatasourceLockboxResource(createLockbox(datasource))
);
// Verify sync is complete before acquiring the resource
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index fdf23850580..b0ed0423818 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -432,7 +432,9 @@ public class SupervisorResource
manager -> {
try {
if (manager.handoffTaskGroupsEarly(id, taskGroupIds)) {
- return Response.ok().build();
+ return Response.status(Response.Status.ACCEPTED)
+ .entity(Map.of()) // empty json object to allow
deserialization by the client
+ .build();
} else {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error",
StringUtils.format("Supervisor was not found [%s]", id)))
diff --git
a/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/WikipediaStreamEventStreamGenerator.java
b/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/WikipediaStreamEventStreamGenerator.java
index c357afc0e3d..b26c999ed83 100644
---
a/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/WikipediaStreamEventStreamGenerator.java
+++
b/integration-tests-ex/tools/src/main/java/org/apache/druid/testing/tools/WikipediaStreamEventStreamGenerator.java
@@ -19,6 +19,7 @@
package org.apache.druid.testing.tools;
+import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.java.util.common.Pair;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
@@ -30,6 +31,14 @@ import java.util.List;
public class WikipediaStreamEventStreamGenerator extends
SyntheticStreamGenerator
{
+ /**
+ * Column which may serve as a unique identifier for each record. This column
+ * is used instead of a dedicated "id" column since some tests like
+ * {@code KafkaDataFormatsTest} use hardcoded schemas that rely on these
+ * specific column names.
+ */
+ public static final String COL_UNIQUE_NAMESPACE = "namespace";
+
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
public WikipediaStreamEventStreamGenerator(EventSerializer serializer, int
eventsPerSeconds, long cyclePaddingMs)
@@ -49,7 +58,7 @@ public class WikipediaStreamEventStreamGenerator extends
SyntheticStreamGenerato
event.add(Pair.of("newPage", "true"));
event.add(Pair.of("robot", "false"));
event.add(Pair.of("anonymous", "false"));
- event.add(Pair.of("namespace", "article"));
+ event.add(Pair.of(COL_UNIQUE_NAMESPACE, "article " +
IdUtils.getRandomId()));
event.add(Pair.of("continent", "North America"));
event.add(Pair.of("country", "United States"));
event.add(Pair.of("region", "Bay Area"));
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 018b387a393..51903553ae9 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -179,17 +179,6 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.druid.extensions</groupId>
- <artifactId>druid-kafka-indexing-service</artifactId>
- <version>${project.parent.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-lookups-cached-global</artifactId>
@@ -238,11 +227,6 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>${apache.kafka.version}</version>
- </dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
deleted file mode 100644
index dde9bb5a035..00000000000
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaAdminClient.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.testing.utils;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.testing.tools.IntegrationTestingConfig;
-import org.apache.druid.testing.tools.KafkaUtil;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.CreatePartitionsResult;
-import org.apache.kafka.clients.admin.CreateTopicsResult;
-import org.apache.kafka.clients.admin.DeleteTopicsResult;
-import org.apache.kafka.clients.admin.DescribeTopicsResult;
-import org.apache.kafka.clients.admin.NewPartitions;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.clients.admin.TopicDescription;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-public class KafkaAdminClient implements StreamAdminClient
-{
- private AdminClient adminClient;
-
- public KafkaAdminClient(IntegrationTestingConfig config)
- {
- Properties properties = new Properties();
- properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
config.getKafkaHost());
- KafkaUtil.addPropertiesFromTestConfig(config, properties);
- adminClient = AdminClient.create(properties);
- }
-
- @Override
- public void createStream(String streamName, int partitionCount, Map<String,
String> tags) throws Exception
- {
- final short replicationFactor = 1;
- final NewTopic newTopic = new NewTopic(streamName, partitionCount,
replicationFactor);
- final CreateTopicsResult createTopicsResult =
adminClient.createTopics(Collections.singleton(newTopic));
- // Wait for create topic to compelte
- createTopicsResult.values().get(streamName).get();
- }
-
- @Override
- public void deleteStream(String streamName) throws Exception
- {
- DeleteTopicsResult deleteTopicsResult =
adminClient.deleteTopics(ImmutableList.of(streamName));
- deleteTopicsResult.values().get(streamName).get();
- }
-
- /**
- * This method can only increase the partition count of {@param streamName}
to have a final partition
- * count of {@param newPartitionCount}
- * If {@param blocksUntilStarted} is set to true, then this method will
blocks until the partitioning
- * started (but not nessesary finished), otherwise, the method will returns
right after issue the
- * repartitioning command
- */
- @Override
- public void updatePartitionCount(String streamName, int newPartitionCount,
boolean blocksUntilStarted) throws Exception
- {
- Map<String, NewPartitions> counts = new HashMap<>();
- counts.put(streamName, NewPartitions.increaseTo(newPartitionCount));
- CreatePartitionsResult createPartitionsResult =
adminClient.createPartitions(counts);
- if (blocksUntilStarted) {
- createPartitionsResult.values().get(streamName).get();
-
- }
- }
-
- /**
- * Stream state such as active/non-active does not applies to Kafka.
- * Returning true since Kafka stream is always active and can always be
writen and read to.
- */
- @Override
- public boolean isStreamActive(String streamName)
- {
- return true;
- }
-
- @Override
- public int getStreamPartitionCount(String streamName) throws Exception
- {
- DescribeTopicsResult result =
adminClient.describeTopics(ImmutableList.of(streamName));
- TopicDescription topicDescription = result.values().get(streamName).get();
- return topicDescription.partitions().size();
- }
-
- @Override
- public boolean verfiyPartitionCountUpdated(String streamName, int
oldPartitionCount, int newPartitionCount) throws Exception
- {
- return getStreamPartitionCount(streamName) == newPartitionCount;
- }
-}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
index 60600d633d2..6db9e3999f6 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java
@@ -25,14 +25,6 @@ package org.apache.druid.tests;
*/
public class TestNGGroup
{
- public static final String KAFKA_INDEX = "kafka-index";
-
- public static final String KAFKA_INDEX_SLOW = "kafka-index-slow";
-
- public static final String TRANSACTIONAL_KAFKA_INDEX =
"kafka-transactional-index";
-
- public static final String TRANSACTIONAL_KAFKA_INDEX_SLOW =
"kafka-transactional-index-slow";
-
/**
* This group can only be run individually using -Dgroups=query since it
requires specific test data setup.
*/
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
deleted file mode 100644
index c36a23ad4ec..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITFaultyClusterTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.coordinator.duty;
-
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.testing.cluster.ClusterTestingTaskConfig;
-import org.apache.druid.testing.cluster.overlord.FaultyLagAggregator;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.tools.EventSerializer;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.tools.KafkaUtil;
-import org.apache.druid.testing.tools.StreamEventWriter;
-import org.apache.druid.testing.tools.StreamGenerator;
-import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
-import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
-import org.joda.time.DateTime;
-import org.joda.time.Period;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.Closeable;
-import java.util.Map;
-
-/**
- * Integration test to verify induction of various faults in the cluster
- * using {@link ClusterTestingTaskConfig}.
- * <p>
- * Future tests can try to leverage the cluster testing config to write tests
- * for cluster scalability and stability.
- */
-@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW})
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITFaultyClusterTest extends AbstractKafkaIndexingServiceTest
-{
- private static final Logger log = new Logger(ITFaultyClusterTest.class);
-
- private GeneratedTestConfig generatedTestConfig;
- private StreamGenerator streamGenerator;
-
- private String fullDatasourceName;
-
- @DataProvider
- public static Object[] getParameters()
- {
- return new Object[]{false, true};
- }
-
- @BeforeClass
- public void setupClass() throws Exception
- {
- doBeforeClass();
- }
-
- @BeforeMethod
- public void setup() throws Exception
- {
- generatedTestConfig = new GeneratedTestConfig(
- Specs.PARSER_TYPE,
- getResourceAsString(Specs.INPUT_FORMAT_PATH)
- );
- fullDatasourceName = generatedTestConfig.getFullDatasourceName();
- final EventSerializer serializer = jsonMapper.readValue(
- getResourceAsStream(Specs.SERIALIZER_PATH),
- EventSerializer.class
- );
- streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 6,
100);
- }
-
- @Override
- public String getTestNamePrefix()
- {
- return "faulty_cluster";
- }
-
- @Test(dataProvider = "getParameters")
- public void test_streamingIngestion_worksWithFaultyCluster(boolean
transactionEnabled) throws Exception
- {
- if (shouldSkipTest(transactionEnabled)) {
- return;
- }
-
- try (
- final Closeable closer = createResourceCloser(generatedTestConfig);
- final StreamEventWriter streamEventWriter =
createStreamEventWriter(config, transactionEnabled)
- ) {
- // Set context parameters
- final Map<String, Object> taskContext = Map.of(
- "clusterTesting",
- Map.of(
- "metadataConfig", Map.of("cleanupPendingSegments", false),
- "coordinatorClientConfig", Map.of("minSegmentHandoffDelay",
"PT10S"),
- "taskActionClientConfig", Map.of("segmentAllocateDelay",
"PT10S", "segmentPublishDelay", "PT10S")
- )
- );
-
- // Start supervisor
- final String supervisorSpec = generatedTestConfig
- .withContext(taskContext)
- .withLagAggregator(new FaultyLagAggregator(1_000_000))
- .getStreamIngestionPropsTransform()
- .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
-
- final String supervisorId = indexer.submitSupervisor(supervisorSpec);
- generatedTestConfig.setSupervisorId(supervisorId);
- log.info("Submitted supervisor[%s] with spec[%s]", supervisorId,
supervisorSpec);
-
- // Generate data for minutes 1, 2 and 3
- final DateTime firstRecordTime = DateTimes.of("2000-01-01T01:01:00Z");
- final long rowsForMinute1 = generateDataForMinute(firstRecordTime,
streamEventWriter);
- final long rowsForMinute2 =
generateDataForMinute(firstRecordTime.plus(Period.minutes(1)),
streamEventWriter);
- final long rowsForMinute3 =
generateDataForMinute(firstRecordTime.plus(Period.minutes(2)),
streamEventWriter);
-
- ITRetryUtil.retryUntilTrue(
- () -> {
- final Integer aggregateLag = (Integer)
indexer.getFullSupervisorStatus(supervisorId).get("aggregateLag");
- log.info("Aggregate lag is [%d].", aggregateLag);
- return aggregateLag != null && aggregateLag > 1_000_000;
- },
- "Aggregate lag exceeds 1M"
- );
-
- // Wait for data to be ingested for all the minutes
- waitUntilDatasourceRowCountEquals(fullDatasourceName, rowsForMinute1 +
rowsForMinute2 + rowsForMinute3);
- waitForSegmentsToLoad(fullDatasourceName);
-
- // 2 segments for each minute, total 6
- waitUntilDatasourceSegmentCountEquals(fullDatasourceName, 6);
- }
- }
-
- /**
- * Generates data points for a minute with the specified start time.
- *
- * @return Number of rows generated.
- */
- private long generateDataForMinute(DateTime firstRecordTime,
StreamEventWriter streamEventWriter)
- {
- final long rowCount = streamGenerator.run(
- generatedTestConfig.getStreamName(),
- streamEventWriter,
- 10,
- firstRecordTime
- );
- log.info("Generated [%d] rows for 1 minute interval with start[%s]",
rowCount, firstRecordTime);
-
- return rowCount;
- }
-
- /**
- * Checks if a test should be skipped based on whether transaction is
enabled or not.
- */
- private boolean shouldSkipTest(boolean testEnableTransaction)
- {
- Map<String, String> kafkaTestProps = KafkaUtil
- .getAdditionalKafkaTestConfigFromProperties(config);
- boolean configEnableTransaction = Boolean.parseBoolean(
- kafkaTestProps.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED,
"false")
- );
-
- return configEnableTransaction != testEnableTransaction;
- }
-
- /**
- * Constants for test specs.
- */
- private static class Specs
- {
- static final String SERIALIZER_PATH = DATA_RESOURCE_ROOT +
"/csv/serializer/serializer.json";
- static final String INPUT_FORMAT_PATH = DATA_RESOURCE_ROOT +
"/csv/input_format/input_format.json";
- static final String PARSER_TYPE = AbstractStreamIndexingTest.INPUT_FORMAT;
- }
-
-}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
deleted file mode 100644
index 2115ed59f7e..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import com.google.common.base.Preconditions;
-import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
-import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.testing.tools.IntegrationTestingConfig;
-import org.apache.druid.testing.tools.KafkaEventWriter;
-import org.apache.druid.testing.tools.KafkaUtil;
-import org.apache.druid.testing.tools.StreamEventWriter;
-import org.apache.druid.testing.utils.KafkaAdminClient;
-import org.apache.druid.testing.utils.StreamAdminClient;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.function.Function;
-
-public abstract class AbstractKafkaIndexingServiceTest extends
AbstractStreamIndexingTest
-{
- @Override
- StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config)
- {
- return new KafkaAdminClient(config);
- }
-
- @Override
- public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig
config, Boolean transactionEnabled)
- {
- return new KafkaEventWriter(config,
Preconditions.checkNotNull(transactionEnabled, "transactionEnabled"));
- }
-
- @Override
- Function<String, String> generateStreamIngestionPropsTransform(
- String supervisorId,
- String streamName,
- String fullDatasourceName,
- String parserType,
- String parserOrInputFormat,
- List<String> dimensions,
- Map<String, Object> context,
- LagAggregator lagAggregator,
- IntegrationTestingConfig config
- )
- {
- final Map<String, Object> consumerConfigs =
KafkaConsumerConfigs.getConsumerProperties();
- final Properties consumerProperties = new Properties();
- consumerProperties.putAll(consumerConfigs);
- consumerProperties.setProperty("bootstrap.servers",
config.getKafkaInternalHost());
- KafkaUtil.addPropertiesFromTestConfig(config, consumerProperties);
- return spec -> {
- try {
- spec = StringUtils.replace(
- spec,
- "%%SUPERVISOR_ID%%",
- supervisorId
- );
- spec = StringUtils.replace(
- spec,
- "%%DATASOURCE%%",
- fullDatasourceName
- );
- spec = StringUtils.replace(
- spec,
- "%%STREAM_TYPE%%",
- "kafka"
- );
- spec = StringUtils.replace(
- spec,
- "%%TOPIC_KEY%%",
- "topic"
- );
- spec = StringUtils.replace(
- spec,
- "%%TOPIC_VALUE%%",
- streamName
- );
-
- if (AbstractStreamIndexingTest.INPUT_FORMAT.equals(parserType)) {
- spec = StringUtils.replace(
- spec,
- "%%INPUT_FORMAT%%",
- parserOrInputFormat
- );
- spec = StringUtils.replace(
- spec,
- "%%PARSER%%",
- "null"
- );
- } else if
(AbstractStreamIndexingTest.INPUT_ROW_PARSER.equals(parserType)) {
- spec = StringUtils.replace(
- spec,
- "%%PARSER%%",
- parserOrInputFormat
- );
- spec = StringUtils.replace(
- spec,
- "%%INPUT_FORMAT%%",
- "null"
- );
- }
- spec = StringUtils.replace(
- spec,
- "%%USE_EARLIEST_KEY%%",
- "useEarliestOffset"
- );
- spec = StringUtils.replace(
- spec,
- "%%STREAM_PROPERTIES_KEY%%",
- "consumerProperties"
- );
- spec = StringUtils.replace(
- spec,
- "%%SCHEMA_REGISTRY_HOST%%",
- StringUtils.format("http://%s",
config.getSchemaRegistryInternalHost())
- );
- spec = StringUtils.replace(
- spec,
- "%%DIMENSIONS%%",
- jsonMapper.writeValueAsString(dimensions)
- );
- spec = StringUtils.replace(
- spec,
- "%%CONTEXT%%",
- jsonMapper.writeValueAsString(context)
- );
- spec = StringUtils.replace(
- spec,
- "%%LAG_AGGREGATOR%%",
- jsonMapper.writeValueAsString(lagAggregator)
- );
- return StringUtils.replace(
- spec,
- "%%STREAM_PROPERTIES_VALUE%%",
- jsonMapper.writeValueAsString(consumerProperties)
- );
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
- }
-
- @Override
- Function<String, String> generateStreamQueryPropsTransform(String
streamName, String fullDatasourceName)
- {
- return spec -> {
- try {
- spec = StringUtils.replace(
- spec,
- "%%DATASOURCE%%",
- fullDatasourceName
- );
- spec = StringUtils.replace(
- spec,
- "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%",
- TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
- );
- spec = StringUtils.replace(
- spec,
- "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%",
-
TIMESTAMP_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND - 1))
- );
- spec = StringUtils.replace(
- spec,
- "%%TIMEBOUNDARY_RESPONSE_MINTIME%%",
- TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
- );
- spec = StringUtils.replace(
- spec,
- "%%TIMESERIES_QUERY_START%%",
- INTERVAL_FMT.print(FIRST_EVENT_TIME)
- );
- spec = StringUtils.replace(
- spec,
- "%%TIMESERIES_QUERY_END%%",
-
INTERVAL_FMT.print(FIRST_EVENT_TIME.plusSeconds(TOTAL_NUMBER_OF_SECOND -
1).plusMinutes(2))
- );
- spec = StringUtils.replace(
- spec,
- "%%TIMESERIES_RESPONSE_TIMESTAMP%%",
- TIMESTAMP_FMT.print(FIRST_EVENT_TIME)
- );
- spec = StringUtils.replace(
- spec,
- "%%TIMESERIES_ADDED%%",
- Long.toString(getSumOfEventSequence(EVENTS_PER_SECOND) *
TOTAL_NUMBER_OF_SECOND)
- );
- return StringUtils.replace(
- spec,
- "%%TIMESERIES_NUMEVENTS%%",
- Integer.toString(EVENTS_PER_SECOND * TOTAL_NUMBER_OF_SECOND)
- );
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- };
- }
-}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
deleted file mode 100644
index 88510139d91..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceNonTransactionalSerializedTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = {TestNGGroup.KAFKA_INDEX_SLOW})
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceNonTransactionalSerializedTest extends
AbstractKafkaIndexingServiceTest
-{
- @Override
- public String getTestNamePrefix()
- {
- return "kafka_nontransactional_serialized";
- }
-
- @BeforeClass
- public void beforeClass() throws Exception
- {
- doBeforeClass();
- }
-
- /**
- * This test must be run individually since the test affect and modify the
state of the Druid cluster
- */
- @Test
- public void testKafkaIndexDataWithLosingCoordinator() throws Exception
- {
- doTestIndexDataWithLosingCoordinator(false);
- }
-
- /**
- * This test must be run individually since the test affect and modify the
state of the Druid cluster
- */
- @Test
- public void testKafkaIndexDataWithLosingOverlord() throws Exception
- {
- doTestIndexDataWithLosingOverlord(false);
- }
-
- /**
- * This test must be run individually since the test affect and modify the
state of the Druid cluster
- */
- @Test
- public void testKafkaIndexDataWithLosingHistorical() throws Exception
- {
- doTestIndexDataWithLosingHistorical(false);
- }
-}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
deleted file mode 100644
index e729680159d..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceStopTasksEarlyTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = TestNGGroup.KAFKA_INDEX)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceStopTasksEarlyTest extends
AbstractKafkaIndexingServiceTest
-{
- @Override
- public String getTestNamePrefix()
- {
- return "kafka_stop_tasks_early";
- }
-
- @BeforeClass
- public void beforeClass() throws Exception
- {
- doBeforeClass();
- }
-
-
- // This test does not actually check whether the tasks stopped early since
the API does
- // not make any guarantees about handoff. Instead it makes sure the handoff
API can be called
- // and that the tasks will eventually catch up with new data.
- @Test
- public void testStopTasksEarly() throws Exception
- {
- doTestIndexDataHandoffEarly(false);
- }
-}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
deleted file mode 100644
index fdd06ff4f88..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalSerializedTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceTransactionalSerializedTest extends
AbstractKafkaIndexingServiceTest
-{
- @Override
- public String getTestNamePrefix()
- {
- return "kafka_transactional_serialized";
- }
-
- @BeforeClass
- public void beforeClass() throws Exception
- {
- doBeforeClass();
- }
-
- /**
- * This test must be run individually since the test affect and modify the
state of the Druid cluster
- */
- @Test
- public void testKafkaIndexDataWithLosingCoordinator() throws Exception
- {
- doTestIndexDataWithLosingCoordinator(true);
- }
-
- /**
- * This test must be run individually since the test affect and modify the
state of the Druid cluster
- */
- @Test
- public void testKafkaIndexDataWithLosingOverlord() throws Exception
- {
- doTestIndexDataWithLosingOverlord(true);
- }
-
- /**
- * This test must be run individually since the test affect and modify the
state of the Druid cluster
- */
- @Test
- public void testKafkaIndexDataWithLosingHistorical() throws Exception
- {
- doTestIndexDataWithLosingHistorical(true);
- }
-}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
deleted file mode 100644
index 7a7bd7169cf..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNilColumnTest.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.sql.http.SqlQuery;
-import org.apache.druid.testing.clients.TaskResponseObject;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.apache.druid.testing.tools.JsonEventSerializer;
-import org.apache.druid.testing.tools.StreamEventWriter;
-import org.apache.druid.testing.tools.StreamGenerator;
-import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
-import org.apache.druid.testing.utils.SqlQueryWithResults;
-import org.apache.druid.tests.TestNGGroup;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX_SLOW)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITNilColumnTest extends AbstractKafkaIndexingServiceTest
-{
- private static final Logger LOG = new Logger(ITNilColumnTest.class);
- private static final String NIL_DIM1 = "nilDim1";
- private static final String NIL_DIM2 = "nilDim2";
-
- private final List<String> dimensions;
-
- public ITNilColumnTest()
- {
- this.dimensions = new ArrayList<>(DEFAULT_DIMENSIONS.size() + 2);
- dimensions.add(NIL_DIM1);
- dimensions.addAll(DEFAULT_DIMENSIONS);
- dimensions.add(NIL_DIM2);
- }
-
- @Override
- public String getTestNamePrefix()
- {
- return "nil-column-test";
- }
-
- @BeforeClass
- public void beforeClass() throws Exception
- {
- doBeforeClass();
- }
-
- @Test
- public void testQueryNilColumnBeforeAndAfterPublishingSegments() throws
Exception
- {
- final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
- INPUT_FORMAT,
- getResourceAsString(JSON_INPUT_FORMAT_PATH),
- dimensions
- );
- try (
- final Closeable closer = createResourceCloser(generatedTestConfig);
- final StreamEventWriter streamEventWriter =
createStreamEventWriter(config, true)
- ) {
- final String taskSpec =
generatedTestConfig.getStreamIngestionPropsTransform()
-
.apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
- LOG.info("supervisorSpec: [%s]\n", taskSpec);
- // Start supervisor
- generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
- LOG.info("Submitted supervisor");
- // Start generating half of the data
- final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(
- new JsonEventSerializer(jsonMapper),
- EVENTS_PER_SECOND,
- CYCLE_PADDING_MS
- );
- long numWritten = streamGenerator.run(
- generatedTestConfig.getStreamName(),
- streamEventWriter,
- TOTAL_NUMBER_OF_SECOND,
- FIRST_EVENT_TIME
- );
- // Verify supervisor is healthy before suspension
- ITRetryUtil.retryUntil(
- () ->
BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
- true,
- 10000,
- 30,
- "Waiting for supervisor to be healthy"
- );
- // 60 events should have been ingested as per EVENTS_PER_SECOND and
TOTAL_NUMBER_OF_SECOND.
- // Since maxRowsInMemory is set to 500,000, every row should be in
incrementalIndex.
- // So, let's test if SQL finds nil dimensions from incrementalIndexes.
-
dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName());
- final List<SqlQueryWithResults> queryWithResults =
getQueryWithResults(generatedTestConfig);
-
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults));
- final List<SqlQueryWithResults> metadataQueryWithResults =
getMetadataQueryWithResults(generatedTestConfig);
-
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults));
-
- // Suspend the supervisor
- indexer.terminateSupervisor(generatedTestConfig.getSupervisorId());
- ITRetryUtil.retryUntilTrue(
- () -> {
- List<TaskResponseObject> tasks = indexer
- .getRunningTasks()
- .stream()
- .filter(task ->
task.getId().contains(generatedTestConfig.getFullDatasourceName()))
- .filter(task -> "index_kafka".equals(task.getType()))
- .collect(Collectors.toList());
- LOG.info("[%s] tasks are running", tasks.stream().map(task -> {
- try {
- return jsonMapper.writeValueAsString(task);
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList()));
- return tasks.isEmpty();
- },
- "Waiting for all tasks to stop"
- );
-
- // Now, we should have published all segments.
- // Let's test if SQL finds nil dimensions from queryableIndexes.
-
dataLoaderHelper.waitUntilDatasourceIsReady(generatedTestConfig.getFullDatasourceName());
- verifyIngestedData(generatedTestConfig, numWritten);
-
-
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(queryWithResults));
-
sqlQueryHelper.testQueriesFromString(jsonMapper.writeValueAsString(metadataQueryWithResults));
- }
- }
-
- private static List<SqlQueryWithResults>
getQueryWithResults(GeneratedTestConfig generatedTestConfig)
- {
- return ImmutableList.of(
- new SqlQueryWithResults(
- new SqlQuery(
- StringUtils.format(
- "SELECT count(*) FROM \"%s\" WHERE %s IS NOT NULL OR %s IS
NOT NULL",
- generatedTestConfig.getFullDatasourceName(),
- NIL_DIM1,
- NIL_DIM2
- ),
- null,
- false,
- false,
- false,
- null,
- null
- ),
- "Get row count of datasource",
- ImmutableList.of(ImmutableMap.of("EXPR$0", 0))
- )
- );
- }
-
- private List<SqlQueryWithResults>
getMetadataQueryWithResults(GeneratedTestConfig generatedTestConfig)
- {
- return ImmutableList.of(
- new SqlQueryWithResults(
- new SqlQuery(
- StringUtils.format(
- "SELECT COLUMN_NAME, IS_NULLABLE, DATA_TYPE"
- + " FROM INFORMATION_SCHEMA.COLUMNS"
- + " WHERE TABLE_NAME = '%s' AND COLUMN_NAME IN ('%s',
'%s')",
- generatedTestConfig.getFullDatasourceName(),
- NIL_DIM1,
- NIL_DIM2
- ),
- null,
- false,
- false,
- false,
- null,
- null
- ),
- "Get column info of datasource",
- ImmutableList.of(
- ImmutableMap.of(
- "COLUMN_NAME",
- NIL_DIM1,
- "IS_NULLABLE",
- "YES",
- "DATA_TYPE",
- "VARCHAR"
- ),
- ImmutableMap.of(
- "COLUMN_NAME",
- NIL_DIM2,
- "IS_NULLABLE",
- "YES",
- "DATA_TYPE",
- "VARCHAR"
- )
- )
- )
- );
- }
-}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
deleted file mode 100644
index 7f3f41316e3..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.parallelized;
-
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = TestNGGroup.KAFKA_INDEX)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends
AbstractKafkaIndexingServiceTest
-{
- @Override
- public String getTestNamePrefix()
- {
- return "kafka_non_transactional_parallelized";
- }
-
- @BeforeClass
- public void beforeClass() throws Exception
- {
- doBeforeClass();
- }
-
- /**
- * This test can be run concurrently with other tests as it
creates/modifies/teardowns a unique datasource
- * and supervisor maintained and scoped within this test only
- */
- @Test
- public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
- {
- doTestIndexDataWithStartStopSupervisor(false);
- }
-
- /**
- * This test can be run concurrently with other tests as it
creates/modifies/teardowns a unique datasource
- * and supervisor maintained and scoped within this test only
- */
- @Test
- public void testKafkaIndexDataWithWithAutoscaler() throws Exception
- {
- doTestIndexDataWithAutoscaler(false);
- }
-
- @Test
- public void testIndexDataWithIdleConfigEnabled() throws Exception
- {
- doTestIndexDataWithIdleConfigEnabled(false);
- }
-
- /**
- * This test can be run concurrently with other tests as it
creates/modifies/teardowns a unique datasource
- * and supervisor maintained and scoped within this test only
- */
- @Test
- public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
- {
- doTestIndexDataWithStreamReshardSplit(false);
- }
-
- /**
- * This test can be run concurrently with other tests as it
creates/modifies/teardowns a unique datasource
- * and supervisor maintained and scoped within this test only
- */
- @Test
- public void testKafkaTerminatedSupervisorAutoCleanup() throws Exception
- {
- doTestTerminatedSupervisorAutoCleanup(false);
- }
-
- /**
- * This test can be run concurrently with other tests as it
creates/modifies/teardowns a unique datasource
- * with supervisor(s) maintained and scoped within this test only
- */
- @Test
- public void testKafkaIndexMultiSupervisorWithNoTransaction() throws Exception
- {
- doTestMultiSupervisorIndexDataStableState(
- false,
- 2
- );
- }
-}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java
deleted file mode 100644
index 49ca3e9c226..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceTransactionalParallelizedTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.parallelized;
-
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.tests.TestNGGroup;
-import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-@Test(groups = TestNGGroup.TRANSACTIONAL_KAFKA_INDEX)
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITKafkaIndexingServiceTransactionalParallelizedTest extends
AbstractKafkaIndexingServiceTest
-{
- @Override
- public String getTestNamePrefix()
- {
- return "kafka_transactional_parallelized";
- }
-
- @BeforeClass
- public void beforeClass() throws Exception
- {
- doBeforeClass();
- }
-
- /**
- * This test can be run concurrently with other tests as it
creates/modifies/teardowns a unique datasource
- * and supervisor maintained and scoped within this test only
- */
- @Test
- public void testKafkaIndexDataWithStartStopSupervisor() throws Exception
- {
- doTestIndexDataWithStartStopSupervisor(true);
- }
-
- /**
- * This test can be run concurrently with other tests as it
creates/modifies/teardowns a unique datasource
- * and supervisor maintained and scoped within this test only
- */
- @Test
- public void testKafkaIndexDataWithKafkaReshardSplit() throws Exception
- {
- doTestIndexDataWithStreamReshardSplit(true);
- }
-
- /**
- * This test can be run concurrently with other tests as it
creates/modifies/teardowns a unique datasource
- * with supervisor(s) maintained and scoped within this test only
- */
- @Test
- public void testKafkaIndexMultiSupervisorWithTransaction() throws Exception
- {
- doTestMultiSupervisorIndexDataStableState(
- true,
- 2
- );
- }
-}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorReport.java
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorReport.java
index ebe1018f4d5..04d22888d03 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorReport.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorReport.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.overlord.supervisor;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
@@ -30,7 +31,12 @@ public class SupervisorReport<T>
private final DateTime generationTime;
private final T payload;
- public SupervisorReport(String id, DateTime generationTime, T payload)
+ @JsonCreator
+ public SupervisorReport(
+ @JsonProperty("id") String id,
+ @JsonProperty("generationTime") DateTime generationTime,
+ @JsonProperty("payload") T payload
+ )
{
this.id = id;
this.generationTime = generationTime;
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index 0a1a1051f2e..d796d240987 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -235,7 +235,19 @@ public class EmbeddedClusterApis implements
EmbeddedResource
*/
public int getTaskCount(String status, String dataSource)
{
- return ImmutableList.copyOf((Iterator<?>) onLeaderOverlord(o ->
o.taskStatuses(status, dataSource, 100))).size();
+ return getTasks(dataSource, status).size();
+ }
+
+ /**
+ * Gets the details of tasks with the given state for the specified
datasource.
+ * Valid task states are "pending", "waiting", "running", "complete".
+ */
+ public List<TaskStatusPlus> getTasks(String dataSource, String taskState)
+ {
+ return ImmutableList.copyOf(
+ (Iterator<? extends TaskStatusPlus>)
+ onLeaderOverlord(o -> o.taskStatuses(taskState, dataSource, 100))
+ );
}
/**
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
index 04b60c03118..beacab52920 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
@@ -185,7 +185,8 @@ public class EmbeddedServiceClient
try {
StatusResponseHolder response = serviceClient.request(requestBuilder,
responseHandler);
- if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+ if (!response.getStatus().equals(HttpResponseStatus.OK)
+ && !response.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
throw new ISE(
"Request[%s] failed with status[%s] content[%s].",
requestBuilder.toString(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]