This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7967a2547b1 Migrate Coordinator pause IT to embedded tests (#18473)
7967a2547b1 is described below
commit 7967a2547b13f806d563858765d8f0435f4250d7
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Sep 3 16:41:26 2025 +0530
Migrate Coordinator pause IT to embedded tests (#18473)
Changes:
- Migrate `ITTestCoordinatorPause` to `CoordinatorPauseTest`
- Remove CI job integration-other-tests as it had only the above test
example run
- Make `DutyGroupStatus` deserializable to use in the tests
---
.github/workflows/cron-job-its.yml | 14 ---
.github/workflows/standard-its.yml | 13 ---
.../embedded/server/CoordinatorPauseTest.java | 119 +++++++++++++++++++++
.../tests/indexer/ITTestCoordinatorPausedTest.java | 94 ----------------
.../coordinator/duty/CoordinatorDutyGroup.java | 3 +-
.../server/coordinator/duty/DutyGroupStatus.java | 16 +--
.../druid/server/http/CoordinatorDutyStatus.java | 6 +-
7 files changed, 134 insertions(+), 131 deletions(-)
diff --git a/.github/workflows/cron-job-its.yml
b/.github/workflows/cron-job-its.yml
index c6dda83d4a3..4cba39ebbfe 100644
--- a/.github/workflows/cron-job-its.yml
+++ b/.github/workflows/cron-job-its.yml
@@ -99,20 +99,6 @@ jobs:
override_config_path: ./environment-configs/test-groups/prepopulated-data
group: ${{ matrix.testing_group }}
- integration-other-tests:
- strategy:
- fail-fast: false
- matrix:
- indexer: [ middleManager, indexer ]
- uses: ./.github/workflows/reusable-standard-its.yml
- needs: build
- with:
- build_jdk: 17
- runtime_jdk: 17
- testing_groups:
-DexcludedGroups=batch-index,input-format,input-source,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-stora
[...]
- use_indexer: ${{ matrix.indexer }}
- group: other
-
security_vulnerabilities:
if: github.repository == 'apache/druid'
name: security vulnerabilities
diff --git a/.github/workflows/standard-its.yml
b/.github/workflows/standard-its.yml
index 8eac516529d..7c786d9bdf1 100644
--- a/.github/workflows/standard-its.yml
+++ b/.github/workflows/standard-its.yml
@@ -171,16 +171,3 @@ jobs:
/usr/local/bin/kubectl logs --tail 1000 druid-tiny-cluster-"$v"s-0
||:;
/usr/local/bin/kubectl get events | grep druid-tiny-cluster-"$v"s-0
||:;
done
-
- integration-other-tests:
- strategy:
- fail-fast: false
- matrix:
- indexer: [middleManager, indexer]
- uses: ./.github/workflows/reusable-standard-its.yml
- with:
- build_jdk: 17
- runtime_jdk: 17
- testing_groups:
-DexcludedGroups=batch-index,input-format,input-source,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-stora
[...]
- use_indexer: ${{ matrix.indexer }}
- group: other
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorPauseTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorPauseTest.java
new file mode 100644
index 00000000000..32725c1f091
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorPauseTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.duty.DutyGroupStatus;
+import org.apache.druid.server.http.CoordinatorDutyStatus;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Optional;
+
+public class CoordinatorPauseTest extends EmbeddedClusterTestBase
+{
+ private static final Duration COORDINATOR_DUTY_PERIOD =
Duration.ofMillis(100);
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator()
+ .addProperty("druid.coordinator.period",
COORDINATOR_DUTY_PERIOD.toString());
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ return EmbeddedDruidCluster
+ .withEmbeddedDerbyAndZookeeper()
+ .useLatchableEmitter()
+ .addServer(overlord)
+ .addServer(coordinator)
+ .addServer(new EmbeddedIndexer())
+ .addServer(new EmbeddedBroker())
+ .addServer(new EmbeddedHistorical())
+ .addServer(new EmbeddedRouter());
+ }
+
+ @Test
+ public void test_segmentsAreNotLoaded_ifCoordinatorIsPaused() throws
Exception
+ {
+ // Pause coordinator cycles
+ cluster.callApi().onLeaderCoordinator(
+ c -> c.updateCoordinatorDynamicConfig(
+
CoordinatorDynamicConfig.builder().withPauseCoordination(true).build()
+ )
+ );
+ final DateTime pauseTime = DateTimes.nowUtc();
+
+ // Perform some basic ingestion
+ final Task task = MoreResources.Task.BASIC_INDEX
+ .get()
+ .dataSource(dataSource)
+ .withId(IdUtils.getRandomId());
+ cluster.callApi().runTask(task, overlord);
+
+ // Verify that no coordinator duties have run even after 10 periods
+ Thread.sleep(COORDINATOR_DUTY_PERIOD.toMillis() * 10);
+ final CoordinatorDutyStatus status =
cluster.callApi().serviceClient().onLeaderCoordinator(
+ mapper -> new RequestBuilder(HttpMethod.GET,
"/druid/coordinator/v1/duties"),
+ new TypeReference<>() {}
+ );
+ Assertions.assertNotNull(status);
+
+ final Optional<DutyGroupStatus> matchingDutyStatus =
status.getDutyGroups().stream().filter(
+ group -> group.getName().equals("HistoricalManagementDuties")
+ ).findFirst();
+ Assertions.assertTrue(matchingDutyStatus.isPresent());
+
+ // Verify that the last run was before the pause and all segments are
unavailable
+ final DutyGroupStatus historicalDutyStatus = matchingDutyStatus.get();
+
Assertions.assertTrue(historicalDutyStatus.getLastRunStart().isBefore(pauseTime));
+ cluster.callApi().verifySqlQuery(
+ "SELECT COUNT(*) FROM sys.segments WHERE is_available = 0 AND
datasource = '%s'",
+ dataSource,
+ "10"
+ );
+
+ // Un-pause the Coordinator
+ cluster.callApi().onLeaderCoordinator(
+ c -> c.updateCoordinatorDynamicConfig(
+ CoordinatorDynamicConfig.builder().build()
+ )
+ );
+
+ // Verify that segments are finally loaded on the Historical
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+ cluster.callApi().verifySqlQuery("SELECT COUNT(*) FROM %s", dataSource,
"10");
+ }
+}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java
deleted file mode 100644
index bf2a3ca946b..00000000000
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITTestCoordinatorPausedTest extends AbstractITBatchIndexTest
-{
- private static final Logger LOG = new
Logger(ITTestCoordinatorPausedTest.class);
- private static final String INDEX_DATASOURCE = "wikipedia_index_test";
- private static final String INDEX_TASK =
"/indexer/wikipedia_index_task.json";
- private static final String INDEX_QUERIES_RESOURCE =
"/indexer/wikipedia_index_queries.json";
- private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED =
- CoordinatorDynamicConfig.builder().withPauseCoordination(true).build();
- private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT =
- CoordinatorDynamicConfig.builder().build();
-
- @Inject
- CoordinatorResourceTestClient coordinatorClient;
-
- @Test
- public void testCoordinatorPause() throws Exception
- {
- try (
- final Closeable ignored1 = unloader(INDEX_DATASOURCE +
config.getExtraDatasourceNameSuffix())
- ) {
- coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_PAUSED);
-
- final Function<String, String> transform = spec -> {
- try {
- return StringUtils.replace(
- spec,
- "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
- jsonMapper.writeValueAsString("0")
- );
- }
- catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- };
-
- doIndexTest(
- INDEX_DATASOURCE,
- INDEX_TASK,
- transform,
- INDEX_QUERIES_RESOURCE,
- false,
- false,
- false,
- new Pair<>(false, false)
- );
- TimeUnit.MINUTES.sleep(3);
- if (coordinatorClient.areSegmentsLoaded(INDEX_DATASOURCE)) {
- throw new IllegalStateException("Segments Were Loaded Early!");
- }
- coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_DEFAULT);
- ITRetryUtil.retryUntilTrue(
- () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE +
config.getExtraDatasourceNameSuffix()), "Segment Load"
- );
- }
- }
-}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java
index 642744ae878..44b98061642 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java
@@ -101,14 +101,13 @@ public class CoordinatorDutyGroup
*/
public void run(DruidCoordinatorRuntimeParams params)
{
- markRunStarted();
-
final boolean coordinationPaused =
params.getCoordinatorDynamicConfig().getPauseCoordination();
if (coordinationPaused && coordinator.isLeader()) {
log.info("Coordination has been paused. Duties will not run until
coordination is resumed.");
return;
}
+ markRunStarted();
final CoordinatorRunStats stats = params.getCoordinatorStats();
for (CoordinatorDuty duty : duties) {
if (coordinator.isLeader()) {
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/duty/DutyGroupStatus.java
b/server/src/main/java/org/apache/druid/server/coordinator/duty/DutyGroupStatus.java
index 72b1852c043..b0b15f86fb9 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/duty/DutyGroupStatus.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/duty/DutyGroupStatus.java
@@ -19,6 +19,7 @@
package org.apache.druid.server.coordinator.duty;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@@ -37,14 +38,15 @@ public class DutyGroupStatus
private final long avgRuntimeMillis;
private final long avgRunGapMillis;
+ @JsonCreator
public DutyGroupStatus(
- String name,
- Duration period,
- List<String> dutyNames,
- DateTime lastRunStart,
- DateTime lastRunEnd,
- long avgRuntimeMillis,
- long avgRunGapMillis
+ @JsonProperty("name") String name,
+ @JsonProperty("period") Duration period,
+ @JsonProperty("dutyNames") List<String> dutyNames,
+ @JsonProperty("lastRunStart") DateTime lastRunStart,
+ @JsonProperty("lastRunEnd") DateTime lastRunEnd,
+ @JsonProperty("avgRuntimeMillis") long avgRuntimeMillis,
+ @JsonProperty("avgRunGapMillis") long avgRunGapMillis
)
{
this.name = name;
diff --git
a/server/src/main/java/org/apache/druid/server/http/CoordinatorDutyStatus.java
b/server/src/main/java/org/apache/druid/server/http/CoordinatorDutyStatus.java
index a7b8cdf7c4a..a924a669fbd 100644
---
a/server/src/main/java/org/apache/druid/server/http/CoordinatorDutyStatus.java
+++
b/server/src/main/java/org/apache/druid/server/http/CoordinatorDutyStatus.java
@@ -19,6 +19,7 @@
package org.apache.druid.server.http;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.server.coordinator.duty.DutyGroupStatus;
@@ -28,7 +29,10 @@ public class CoordinatorDutyStatus
{
private final List<DutyGroupStatus> dutyGroups;
- public CoordinatorDutyStatus(List<DutyGroupStatus> dutyGroups)
+ @JsonCreator
+ public CoordinatorDutyStatus(
+ @JsonProperty("dutyGroups") List<DutyGroupStatus> dutyGroups
+ )
{
this.dutyGroups = dutyGroups;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]