This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7967a2547b1 Migrate Coordinator pause IT to embedded tests (#18473)
7967a2547b1 is described below

commit 7967a2547b13f806d563858765d8f0435f4250d7
Author: Kashif Faraz <[email protected]>
AuthorDate: Wed Sep 3 16:41:26 2025 +0530

    Migrate Coordinator pause IT to embedded tests (#18473)
    
    Changes:
    - Migrate `ITTestCoordinatorPause` to `CoordinatorPauseTest`
    - Remove CI job integration-other-tests as it had only the above test 
example run
    - Make `DutyGroupStatus` deserializable to use in the tests
---
 .github/workflows/cron-job-its.yml                 |  14 ---
 .github/workflows/standard-its.yml                 |  13 ---
 .../embedded/server/CoordinatorPauseTest.java      | 119 +++++++++++++++++++++
 .../tests/indexer/ITTestCoordinatorPausedTest.java |  94 ----------------
 .../coordinator/duty/CoordinatorDutyGroup.java     |   3 +-
 .../server/coordinator/duty/DutyGroupStatus.java   |  16 +--
 .../druid/server/http/CoordinatorDutyStatus.java   |   6 +-
 7 files changed, 134 insertions(+), 131 deletions(-)

diff --git a/.github/workflows/cron-job-its.yml 
b/.github/workflows/cron-job-its.yml
index c6dda83d4a3..4cba39ebbfe 100644
--- a/.github/workflows/cron-job-its.yml
+++ b/.github/workflows/cron-job-its.yml
@@ -99,20 +99,6 @@ jobs:
       override_config_path: ./environment-configs/test-groups/prepopulated-data
       group: ${{ matrix.testing_group }}
 
-  integration-other-tests:
-    strategy:
-      fail-fast: false
-      matrix:
-        indexer: [ middleManager, indexer ]
-    uses: ./.github/workflows/reusable-standard-its.yml
-    needs: build
-    with:
-      build_jdk: 17
-      runtime_jdk: 17
-      testing_groups: 
-DexcludedGroups=batch-index,input-format,input-source,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-stora
 [...]
-      use_indexer: ${{ matrix.indexer }}
-      group: other
-
   security_vulnerabilities:
     if: github.repository == 'apache/druid'
     name: security vulnerabilities
diff --git a/.github/workflows/standard-its.yml 
b/.github/workflows/standard-its.yml
index 8eac516529d..7c786d9bdf1 100644
--- a/.github/workflows/standard-its.yml
+++ b/.github/workflows/standard-its.yml
@@ -171,16 +171,3 @@ jobs:
           /usr/local/bin/kubectl logs --tail 1000 druid-tiny-cluster-"$v"s-0 
||:;
           /usr/local/bin/kubectl get events | grep druid-tiny-cluster-"$v"s-0 
||:;
           done
-
-  integration-other-tests:
-    strategy:
-      fail-fast: false
-      matrix:
-        indexer: [middleManager, indexer]
-    uses: ./.github/workflows/reusable-standard-its.yml
-    with:
-      build_jdk: 17
-      runtime_jdk: 17
-      testing_groups: 
-DexcludedGroups=batch-index,input-format,input-source,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-stora
 [...]
-      use_indexer: ${{ matrix.indexer }}
-      group: other
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorPauseTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorPauseTest.java
new file mode 100644
index 00000000000..32725c1f091
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/CoordinatorPauseTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.server;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.duty.DutyGroupStatus;
+import org.apache.druid.server.http.CoordinatorDutyStatus;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.EmbeddedRouter;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Optional;
+
+public class CoordinatorPauseTest extends EmbeddedClusterTestBase
+{
+  private static final Duration COORDINATOR_DUTY_PERIOD = 
Duration.ofMillis(100);
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator()
+      .addProperty("druid.coordinator.period", 
COORDINATOR_DUTY_PERIOD.toString());
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    return EmbeddedDruidCluster
+        .withEmbeddedDerbyAndZookeeper()
+        .useLatchableEmitter()
+        .addServer(overlord)
+        .addServer(coordinator)
+        .addServer(new EmbeddedIndexer())
+        .addServer(new EmbeddedBroker())
+        .addServer(new EmbeddedHistorical())
+        .addServer(new EmbeddedRouter());
+  }
+
+  @Test
+  public void test_segmentsAreNotLoaded_ifCoordinatorIsPaused() throws 
Exception
+  {
+    // Pause coordinator cycles
+    cluster.callApi().onLeaderCoordinator(
+        c -> c.updateCoordinatorDynamicConfig(
+            
CoordinatorDynamicConfig.builder().withPauseCoordination(true).build()
+        )
+    );
+    final DateTime pauseTime = DateTimes.nowUtc();
+
+    // Perform some basic ingestion
+    final Task task = MoreResources.Task.BASIC_INDEX
+        .get()
+        .dataSource(dataSource)
+        .withId(IdUtils.getRandomId());
+    cluster.callApi().runTask(task, overlord);
+
+    // Verify that no coordinator duties have run even after 10 periods
+    Thread.sleep(COORDINATOR_DUTY_PERIOD.toMillis() * 10);
+    final CoordinatorDutyStatus status = 
cluster.callApi().serviceClient().onLeaderCoordinator(
+        mapper -> new RequestBuilder(HttpMethod.GET, 
"/druid/coordinator/v1/duties"),
+        new TypeReference<>() {}
+    );
+    Assertions.assertNotNull(status);
+
+    final Optional<DutyGroupStatus> matchingDutyStatus = 
status.getDutyGroups().stream().filter(
+        group -> group.getName().equals("HistoricalManagementDuties")
+    ).findFirst();
+    Assertions.assertTrue(matchingDutyStatus.isPresent());
+
+    // Verify that the last run was before the pause and all segments are 
unavailable
+    final DutyGroupStatus historicalDutyStatus = matchingDutyStatus.get();
+    
Assertions.assertTrue(historicalDutyStatus.getLastRunStart().isBefore(pauseTime));
+    cluster.callApi().verifySqlQuery(
+        "SELECT COUNT(*) FROM sys.segments WHERE is_available = 0 AND 
datasource = '%s'",
+        dataSource,
+        "10"
+    );
+
+    // Un-pause the Coordinator
+    cluster.callApi().onLeaderCoordinator(
+        c -> c.updateCoordinatorDynamicConfig(
+            CoordinatorDynamicConfig.builder().build()
+        )
+    );
+
+    // Verify that segments are finally loaded on the Historical
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator);
+    cluster.callApi().verifySqlQuery("SELECT COUNT(*) FROM %s", dataSource, 
"10");
+  }
+}
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java
deleted file mode 100644
index bf2a3ca946b..00000000000
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITTestCoordinatorPausedTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.tests.indexer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.inject.Inject;
-import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
-import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
-import org.apache.druid.testing.guice.DruidTestModuleFactory;
-import org.apache.druid.testing.tools.ITRetryUtil;
-import org.testng.annotations.Guice;
-import org.testng.annotations.Test;
-
-import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-@Guice(moduleFactory = DruidTestModuleFactory.class)
-public class ITTestCoordinatorPausedTest extends AbstractITBatchIndexTest
-{
-  private static final Logger LOG = new 
Logger(ITTestCoordinatorPausedTest.class);
-  private static final String INDEX_DATASOURCE = "wikipedia_index_test";
-  private static final String INDEX_TASK = 
"/indexer/wikipedia_index_task.json";
-  private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
-  private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED =
-      CoordinatorDynamicConfig.builder().withPauseCoordination(true).build();
-  private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT =
-      CoordinatorDynamicConfig.builder().build();
-
-  @Inject
-  CoordinatorResourceTestClient coordinatorClient;
-
-  @Test
-  public void testCoordinatorPause() throws Exception
-  {
-    try (
-        final Closeable ignored1 = unloader(INDEX_DATASOURCE + 
config.getExtraDatasourceNameSuffix())
-    ) {
-      coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_PAUSED);
-
-      final Function<String, String> transform = spec -> {
-        try {
-          return StringUtils.replace(
-              spec,
-              "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
-              jsonMapper.writeValueAsString("0")
-          );
-        }
-        catch (JsonProcessingException e) {
-          throw new RuntimeException(e);
-        }
-      };
-
-      doIndexTest(
-          INDEX_DATASOURCE,
-          INDEX_TASK,
-          transform,
-          INDEX_QUERIES_RESOURCE,
-          false,
-          false,
-          false,
-          new Pair<>(false, false)
-      );
-      TimeUnit.MINUTES.sleep(3);
-      if (coordinatorClient.areSegmentsLoaded(INDEX_DATASOURCE)) {
-        throw new IllegalStateException("Segments Were Loaded Early!");
-      }
-      coordinatorClient.postDynamicConfig(DYNAMIC_CONFIG_DEFAULT);
-      ITRetryUtil.retryUntilTrue(
-          () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE + 
config.getExtraDatasourceNameSuffix()), "Segment Load"
-      );
-    }
-  }
-}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java
index 642744ae878..44b98061642 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CoordinatorDutyGroup.java
@@ -101,14 +101,13 @@ public class CoordinatorDutyGroup
    */
   public void run(DruidCoordinatorRuntimeParams params)
   {
-    markRunStarted();
-
     final boolean coordinationPaused = 
params.getCoordinatorDynamicConfig().getPauseCoordination();
     if (coordinationPaused && coordinator.isLeader()) {
       log.info("Coordination has been paused. Duties will not run until 
coordination is resumed.");
       return;
     }
 
+    markRunStarted();
     final CoordinatorRunStats stats = params.getCoordinatorStats();
     for (CoordinatorDuty duty : duties) {
       if (coordinator.isLeader()) {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/DutyGroupStatus.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/DutyGroupStatus.java
index 72b1852c043..b0b15f86fb9 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/DutyGroupStatus.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/DutyGroupStatus.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.coordinator.duty;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.joda.time.DateTime;
 import org.joda.time.Duration;
@@ -37,14 +38,15 @@ public class DutyGroupStatus
   private final long avgRuntimeMillis;
   private final long avgRunGapMillis;
 
+  @JsonCreator
   public DutyGroupStatus(
-      String name,
-      Duration period,
-      List<String> dutyNames,
-      DateTime lastRunStart,
-      DateTime lastRunEnd,
-      long avgRuntimeMillis,
-      long avgRunGapMillis
+      @JsonProperty("name") String name,
+      @JsonProperty("period") Duration period,
+      @JsonProperty("dutyNames") List<String> dutyNames,
+      @JsonProperty("lastRunStart") DateTime lastRunStart,
+      @JsonProperty("lastRunEnd") DateTime lastRunEnd,
+      @JsonProperty("avgRuntimeMillis") long avgRuntimeMillis,
+      @JsonProperty("avgRunGapMillis") long avgRunGapMillis
   )
   {
     this.name = name;
diff --git 
a/server/src/main/java/org/apache/druid/server/http/CoordinatorDutyStatus.java 
b/server/src/main/java/org/apache/druid/server/http/CoordinatorDutyStatus.java
index a7b8cdf7c4a..a924a669fbd 100644
--- 
a/server/src/main/java/org/apache/druid/server/http/CoordinatorDutyStatus.java
+++ 
b/server/src/main/java/org/apache/druid/server/http/CoordinatorDutyStatus.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.http;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.server.coordinator.duty.DutyGroupStatus;
 
@@ -28,7 +29,10 @@ public class CoordinatorDutyStatus
 {
   private final List<DutyGroupStatus> dutyGroups;
 
-  public CoordinatorDutyStatus(List<DutyGroupStatus> dutyGroups)
+  @JsonCreator
+  public CoordinatorDutyStatus(
+      @JsonProperty("dutyGroups") List<DutyGroupStatus> dutyGroups
+  )
   {
     this.dutyGroups = dutyGroups;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to