This is an automated email from the ASF dual-hosted git repository.
lokiore pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new ce316af3fb PHOENIX-7870 :- Per-HA-group poller futures and url1/url2
alternation in GetClusterRoleRecordUtil (#2490)
ce316af3fb is described below
commit ce316af3fb2f1c315c541508e14ae864a30be413
Author: Lokesh Khurana <[email protected]>
AuthorDate: Mon Jun 1 21:48:23 2026 -0700
PHOENIX-7870 :- Per-HA-group poller futures and url1/url2 alternation in
GetClusterRoleRecordUtil (#2490)
* PHOENIX-7870 :- Per-HA-group poller futures and url1/url2 alternation in
GetClusterRoleRecordUtil
Bug 1 — Per-HA-group future tracking
Replaces the single static volatile pollerFuture field (which was
overwritten
on every schedulePoller call regardless of haGroupName, so cancelling one
HA
group's poller would target whichever future was scheduled most recently —
possibly belonging to a different HA group) with a
ConcurrentHashMap<String,
ScheduledFuture<?>> keyed by haGroupName. Symmetric handling for the
existing
schedulerMap (now also removed from the map on the active-CRR cancel
path).
Bug 2 — url1/url2 alternation each tick
Replaces the single-URL poller (which would stall progress if its target
cluster's RegionServer Endpoint became transiently unreachable while the
peer cluster held the Active role) with even/odd-tick alternation between
url1 and url2. Method signatures updated: fetchClusterRoleRecord and
schedulePoller now accept both URLs explicitly.
Generated-by: Claude Code (Opus 4.7)
---
.../apache/phoenix/jdbc/HighAvailabilityGroup.java | 14 +-
.../phoenix/util/GetClusterRoleRecordUtil.java | 145 ++++++++++++-----
.../phoenix/util/GetClusterRoleRecordUtilTest.java | 173 +++++++++++++++++++++
3 files changed, 286 insertions(+), 46 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
index 77c7e75c13..6b9728e797 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/HighAvailabilityGroup.java
@@ -973,14 +973,14 @@ public class HighAvailabilityGroup {
try {
// Get the CRR via RSEndpoint for cluster 1
ClusterRoleRecord roleRecord =
GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(),
- info.getName(), this, pollerInterval, properties);
+ info.getUrl2(), info.getUrl1(), info.getName(), this, pollerInterval,
properties);
// If we have unknown role for any cluster then try getting CRR from
cluster 2 endpoint and if
// we get unknown role from there as well then CRR with higher
adminVersion wins.
if (roleRecord.hasUnknownRole()) {
ClusterRoleRecord roleRecordFromPR;
try {
- roleRecordFromPR =
GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(),
- info.getName(), this, pollerInterval, properties);
+ roleRecordFromPR =
GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(),
+ info.getUrl2(), info.getUrl2(), info.getName(), this,
pollerInterval, properties);
} catch (Exception e) {
// As we were able to get CRR from cluster 1 but cluster 2 threw
exception then just
// return
@@ -1009,16 +1009,16 @@ public class HighAvailabilityGroup {
== SQLExceptionCode.CLUSTER_ROLE_RECORD_NOT_FOUND.getErrorCode()
) {
try {
- return
GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(), info.getName(),
- this, pollerInterval, properties);
+ return
GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(), info.getUrl2(),
+ info.getUrl2(), info.getName(), this, pollerInterval, properties);
} catch (Exception ignoredEx) {
throw (SQLException) e;
}
}
// If caught exception is not CRR not found, then just try cluster 2
endpoint.
- return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl2(),
info.getName(), this,
- pollerInterval, properties);
+ return GetClusterRoleRecordUtil.fetchClusterRoleRecord(info.getUrl1(),
info.getUrl2(),
+ info.getUrl2(), info.getName(), this, pollerInterval, properties);
}
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java
index 4e43c72feb..e021c8cc98 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/GetClusterRoleRecordUtil.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
@@ -52,13 +53,22 @@ public class GetClusterRoleRecordUtil {
private static final Logger LOGGER =
LoggerFactory.getLogger(GetClusterRoleRecordUtil.class);
/**
- * Scheduler to fetch ClusterRoleRecord until we get an Active
ClusterRoleRecord
+ * Per-HA-group scheduler executors, keyed on haGroupName so multiple HA
groups can run pollers
+ * independently without sharing or overwriting each other's lifecycle state.
*/
- private static Map<String, ScheduledExecutorService> schedulerMap = new
ConcurrentHashMap<>();
+ private static final Map<String, ScheduledExecutorService> schedulerMap =
+ new ConcurrentHashMap<>();
- private static final Object pollerLock = new Object();
+ /**
+ * Per-HA-group poller futures, keyed on haGroupName. A previous
implementation kept a single
+ * static {@code pollerFuture} field that was overwritten by each new {@code
schedulePoller}
+ * call regardless of haGroupName, so cancelling the poller for one HA group
would cancel the
+ * future most recently scheduled (which may belong to a different HA
group). Keying by
+ * haGroupName ensures each group's future is cancelled independently.
+ */
+ private static final Map<String, ScheduledFuture<?>> futureMap = new
ConcurrentHashMap<>();
- private static volatile ScheduledFuture<?> pollerFuture = null;
+ private static final Object pollerLock = new Object();
private GetClusterRoleRecordUtil() {
}
@@ -131,48 +141,65 @@ public class GetClusterRoleRecordUtil {
* until we get an Active ClusterRoleRecord (one role should be Active) if
we receive an Active
* roleRecord then client this method will return the roleRecord to be
consumed and used, if not
* then it will start a poller and return non-active roleRecord.
- * @param url URL of the RegionServer Endpoint Service
+ * <p>
+ * The poller alternates between {@code url1} and {@code url2} on successive
ticks so a transient
+ * outage on one cluster does not stall progress; both URLs are passed in
even though the initial
+ * fetch only targets one of them (selected by the caller via the {@code
primaryUrl} hint).
+ * @param url1 URL of the RegionServer Endpoint Service for
cluster 1
+ * @param url2 URL of the RegionServer Endpoint Service for
cluster 2
+ * @param primaryUrl URL to use for the initial (non-poller) fetch; must
be either url1 or
+ * url2
* @param haGroupName Name of the HA group
- * @param properties Connection properties
- * @param pollerInterval Interval in seconds to poll for ClusterRoleRecord
* @param haGroup HighAvailabilityGroup object to refresh the
ClusterRoleRecord when an
* Active CRR is found
+ * @param pollerInterval Interval in milliseconds to poll for
ClusterRoleRecord
+ * @param properties Connection properties
* @throws SQLException if there is an error getting the ClusterRoleRecord
*/
- public static ClusterRoleRecord fetchClusterRoleRecord(String url, String
haGroupName,
- HighAvailabilityGroup haGroup, long pollerInterval, Properties properties)
throws SQLException {
- ClusterRoleRecord clusterRoleRecord = getClusterRoleRecord(url,
haGroupName, true, properties);
+ public static ClusterRoleRecord fetchClusterRoleRecord(String url1, String
url2,
+ String primaryUrl, String haGroupName, HighAvailabilityGroup haGroup, long
pollerInterval,
+ Properties properties) throws SQLException {
+ ClusterRoleRecord clusterRoleRecord =
+ getClusterRoleRecord(primaryUrl, haGroupName, true, properties);
if (
clusterRoleRecord.getPolicy() == HighAvailabilityPolicy.FAILOVER
&& !clusterRoleRecord.getRole1().isActive() &&
!clusterRoleRecord.getRole2().isActive()
) {
LOGGER.info(
- "Non-active ClusterRoleRecord found for HA group {}. Scheduling poller
to check every {} seconds,"
- + "until we find an ACTIVE CRR",
+ "Non-active ClusterRoleRecord found for HA group {}. Scheduling poller
to check every {} ms,"
+ + " alternating between url1 and url2 until we find an ACTIVE CRR",
haGroupName, pollerInterval);
- // Schedule a poller to fetch ClusterRoleRecord every 5 seconds (or
configured value)
+ // Schedule a poller to fetch ClusterRoleRecord every pollerInterval
milliseconds
// until we get an Active ClusterRoleRecord and return the Non-Active CRR
- schedulePoller(url, haGroupName, haGroup, pollerInterval, properties);
+ schedulePoller(url1, url2, haGroupName, haGroup, pollerInterval,
properties);
}
return clusterRoleRecord;
}
/**
- * Method to schedule a poller to fetch ClusterRoleRecord every
pollerInterval seconds until we
- * get an Active ClusterRoleRecord, poller will only start if client will
receive, a Non-Active
- * roleRecord (means either of the roles are not Active and client can't
create a connection)
- * @param url URL of the RegionServer Endpoint Service
+ * Method to schedule a poller to fetch ClusterRoleRecord every
pollerInterval milliseconds until
+ * we get an Active ClusterRoleRecord. Poller will only start if client
received a Non-Active
+ * roleRecord (means neither role is Active and client can't create a
connection).
+ * <p>
+ * The poller alternates between {@code url1} and {@code url2} on successive
ticks. Alternating
+ * (rather than pinning to a single URL) avoids stalling if the chosen
cluster's RegionServer
+ * Endpoint is transiently unreachable while the peer cluster is healthy and
may already hold
+ * the Active role.
+ * <p>
+ * Each haGroupName gets its own entry in {@link #futureMap} and {@link
#schedulerMap}. Multiple
+ * HA groups can therefore run independent pollers; cancellation of one HA
group's poller does
+ * not interfere with another's lifecycle.
+ * @param url1 URL of the RegionServer Endpoint Service for
cluster 1
+ * @param url2 URL of the RegionServer Endpoint Service for
cluster 2
* @param haGroupName Name of the HA group
* @param haGroup HighAvailabilityGroup object to refresh the
ClusterRoleRecord when an
* Active CRR is found
- * @param pollerInterval Interval in seconds to poll for ClusterRoleRecord
+ * @param pollerInterval Interval in milliseconds to poll for
ClusterRoleRecord
* @param properties Connection properties
- * @throws SQLException if there is an error getting or refreshing the
ClusterRoleRecord when an
- * Active CRR is found
*/
- private static void schedulePoller(String url, String haGroupName,
HighAvailabilityGroup haGroup,
- long pollerInterval, Properties properties) {
+ private static void schedulePoller(String url1, String url2, String
haGroupName,
+ HighAvailabilityGroup haGroup, long pollerInterval, Properties properties)
{
synchronized (pollerLock) {
if (schedulerMap.containsKey(haGroupName) &&
!schedulerMap.get(haGroupName).isShutdown()) {
@@ -181,34 +208,50 @@ public class GetClusterRoleRecordUtil {
}
schedulerMap.put(haGroupName, Executors.newScheduledThreadPool(1));
- LOGGER.info("Starting poller for HA group {} to check every {}
milliseconds.", haGroupName,
- pollerInterval);
+ LOGGER.info(
+ "Starting poller for HA group {} to check every {} milliseconds,
alternating between {}"
+ + " and {}.",
+ haGroupName, pollerInterval, url1, url2);
+ AtomicLong tickCount = new AtomicLong(0);
Runnable pollingTask = () -> {
+ // Increment unconditionally so a failed tick still alternates next
iteration.
+ long tick = tickCount.getAndIncrement();
+ String tickUrl = selectUrlForTick(url1, url2, tick);
try {
- ClusterRoleRecord polledCrr = getClusterRoleRecord(url, haGroupName,
true, properties);
- LOGGER.info("Polled CRR: {}", polledCrr);
+ ClusterRoleRecord polledCrr =
+ getClusterRoleRecord(tickUrl, haGroupName, true, properties);
+ LOGGER.info("Polled CRR for HA group {} via {}: {}", haGroupName,
tickUrl, polledCrr);
if (polledCrr.getRole1().isActive() ||
polledCrr.getRole2().isActive()) {
- LOGGER.info("Active ClusterRoleRecord found. Cancelling poller.");
+ LOGGER.info("Active ClusterRoleRecord found for HA group {}.
Cancelling poller.",
+ haGroupName);
synchronized (pollerLock) {
- if (pollerFuture != null) {
- pollerFuture.cancel(false);
+ ScheduledFuture<?> future = futureMap.remove(haGroupName);
+ if (future != null) {
+ future.cancel(false);
+ }
+ try {
+ // Refresh ClusterRoleRecord for the HAGroup with appropriate
transition
+ haGroup.refreshClusterRoleRecord(true);
+ } finally {
+ ScheduledExecutorService scheduler =
schedulerMap.remove(haGroupName);
+ if (scheduler != null) {
+ scheduler.shutdown();
+ }
}
- // Refresh ClusterRoleRecord for the HAGroup with appropriate
transition
- haGroup.refreshClusterRoleRecord(true);
- schedulerMap.get(haGroupName).shutdown();
- schedulerMap.remove(haGroupName);
}
}
} catch (SQLException e) {
- LOGGER.error("Exception found while polling for ClusterRoleRecord on
{}: {}", url,
- e.getMessage());
+ LOGGER.error("Exception found while polling for ClusterRoleRecord on
{} for HA group"
+ + " {}: {}", tickUrl, haGroupName, e.getMessage());
}
};
- // Schedule the task with a fixed delay
- pollerFuture =
schedulerMap.get(haGroupName).scheduleWithFixedDelay(pollingTask, 0,
- pollerInterval, TimeUnit.MILLISECONDS);
+ // Schedule the task with a fixed delay; keyed by haGroupName so each HA
group's future is
+ // independently cancellable.
+ ScheduledFuture<?> future =
schedulerMap.get(haGroupName).scheduleWithFixedDelay(pollingTask,
+ 0, pollerInterval, TimeUnit.MILLISECONDS);
+ futureMap.put(haGroupName, future);
}
}
@@ -241,4 +284,28 @@ public class GetClusterRoleRecordUtil {
propsCopy.remove(PHOENIX_HA_GROUP_ATTR);
return DriverManager.getConnection(url, propsCopy);
}
+
+ /**
+ * Pick which URL the poller should target on tick {@code tick}. Even ticks
(0, 2, 4, ...)
+ * select {@code url1}; odd ticks select {@code url2}. Package-private for
unit-test access.
+ */
+ static String selectUrlForTick(String url1, String url2, long tick) {
+ return (tick % 2 == 0) ? url1 : url2;
+ }
+
+ /**
+ * Test-only accessor for the per-HA-group future map. Package-private so
unit tests can verify
+ * that distinct HA groups produce distinct entries (and that cancellation
removes only the
+ * corresponding entry).
+ */
+ static Map<String, ScheduledFuture<?>> getFutureMapForTesting() {
+ return futureMap;
+ }
+
+ /**
+ * Test-only accessor for the per-HA-group scheduler map. Package-private
for unit-test use.
+ */
+ static Map<String, ScheduledExecutorService> getSchedulerMapForTesting() {
+ return schedulerMap;
+ }
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/util/GetClusterRoleRecordUtilTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/util/GetClusterRoleRecordUtilTest.java
new file mode 100644
index 0000000000..c8ebe40dbc
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/util/GetClusterRoleRecordUtilTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link GetClusterRoleRecordUtil}. These tests cover two
regressions in the
+ * non-active CRR poller infrastructure:
+ *
+ * <ol>
+ * <li>Per-HA-group future tracking: a previous implementation kept a single
static
+ * {@code pollerFuture} field that was overwritten by every {@code
schedulePoller} invocation
+ * regardless of HA group, so cancelling one HA group's poller would target
whichever future was
+ * scheduled most recently — possibly belonging to a different HA group. The
fix keys both the
+ * scheduler executor and the future on {@code haGroupName} via concurrent
maps.</li>
+ * <li>URL alternation each tick: a previous implementation pinned each
scheduled poller to a
+ * single URL passed in at schedule time. If that cluster's RegionServer
Endpoint became
+ * transiently unreachable, the poller could never observe the peer cluster's
CRR even after
+ * the peer became Active. The fix has the poller alternate between url1 and
url2 on each tick.
+ * </li>
+ * </ol>
+ */
+public class GetClusterRoleRecordUtilTest {
+
+ private static final String URL_1 = "phoenix+rpc:cluster1.example.com:2181";
+ private static final String URL_2 = "phoenix+rpc:cluster2.example.com:2181";
+ private static final String HA_GROUP_A = "haGroupA";
+ private static final String HA_GROUP_B = "haGroupB";
+
+ @Before
+ public void clearStateBefore() {
+ GetClusterRoleRecordUtil.getFutureMapForTesting().clear();
+ GetClusterRoleRecordUtil.getSchedulerMapForTesting().clear();
+ }
+
+ @After
+ public void clearStateAfter() {
+ // Ensure nothing leaks across tests; the maps are static class state.
+ GetClusterRoleRecordUtil.getFutureMapForTesting().clear();
+ GetClusterRoleRecordUtil.getSchedulerMapForTesting().clear();
+ }
+
+ /**
+ * URL-alternation core: even ticks pick url1, odd ticks pick url2. This
verifies the helper
+ * that the poller calls each tick to choose its target URL.
+ */
+ @Test
+ public void testSelectUrlForTickAlternates() {
+ assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1,
URL_2, 0L));
+ assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1,
URL_2, 1L));
+ assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1,
URL_2, 2L));
+ assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1,
URL_2, 3L));
+ assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1,
URL_2, 100L));
+ assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1,
URL_2, 101L));
+ }
+
+ /**
+ * Defensive: large tick counts should still alternate cleanly. Guards
against accidental sign
+ * issues if a long tick value approaches Long.MAX_VALUE.
+ */
+ @Test
+ public void testSelectUrlForTickHandlesLargeTickValues() {
+ assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1,
URL_2, 1_000_000L));
+ assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1,
URL_2, 1_000_001L));
+ assertEquals(URL_1, GetClusterRoleRecordUtil.selectUrlForTick(URL_1,
URL_2, Long.MAX_VALUE - 1));
+ assertEquals(URL_2, GetClusterRoleRecordUtil.selectUrlForTick(URL_1,
URL_2, Long.MAX_VALUE));
+ }
+
+ /**
+ * Per-HA-group future map: distinct HA group names produce distinct map
entries. Verifies the
+ * data-structure invariant that replaces the prior single-static-{@code
pollerFuture} field.
+ */
+ @Test
+ public void testFutureMapIsolatesEntriesPerHaGroup() {
+ Map<String, ScheduledFuture<?>> futureMap =
GetClusterRoleRecordUtil.getFutureMapForTesting();
+ Map<String, ScheduledExecutorService> schedulerMap =
+ GetClusterRoleRecordUtil.getSchedulerMapForTesting();
+ assertTrue("futureMap should start empty", futureMap.isEmpty());
+ assertTrue("schedulerMap should start empty", schedulerMap.isEmpty());
+
+ ScheduledFuture<?> futureA = mock(ScheduledFuture.class);
+ ScheduledFuture<?> futureB = mock(ScheduledFuture.class);
+ ScheduledExecutorService schedulerA = mock(ScheduledExecutorService.class);
+ ScheduledExecutorService schedulerB = mock(ScheduledExecutorService.class);
+
+ futureMap.put(HA_GROUP_A, futureA);
+ futureMap.put(HA_GROUP_B, futureB);
+ schedulerMap.put(HA_GROUP_A, schedulerA);
+ schedulerMap.put(HA_GROUP_B, schedulerB);
+
+ assertEquals(2, futureMap.size());
+ assertEquals(2, schedulerMap.size());
+ assertNotNull(futureMap.get(HA_GROUP_A));
+ assertNotNull(futureMap.get(HA_GROUP_B));
+ // Distinct entries — adding HA_GROUP_B did not overwrite HA_GROUP_A's
entry.
+ assertFalse("entries for distinct HA groups must be different references",
+ futureMap.get(HA_GROUP_A) == futureMap.get(HA_GROUP_B));
+ }
+
+ /**
+ * Removal/cancellation isolation: cancelling one HA group's poller cancels
only that group's
+ * future and does not touch the peer group's future. This is the key
behavioural invariant
+ * the prior single-static field violated.
+ */
+ @Test
+ public void testCancelOneHaGroupDoesNotCancelOthers() {
+ Map<String, ScheduledFuture<?>> futureMap =
GetClusterRoleRecordUtil.getFutureMapForTesting();
+ Map<String, ScheduledExecutorService> schedulerMap =
+ GetClusterRoleRecordUtil.getSchedulerMapForTesting();
+
+ ScheduledFuture<?> futureA = mock(ScheduledFuture.class);
+ ScheduledFuture<?> futureB = mock(ScheduledFuture.class);
+ ScheduledExecutorService schedulerA = mock(ScheduledExecutorService.class);
+ ScheduledExecutorService schedulerB = mock(ScheduledExecutorService.class);
+ when(futureA.cancel(false)).thenReturn(true);
+ when(futureB.cancel(false)).thenReturn(true);
+
+ futureMap.put(HA_GROUP_A, futureA);
+ futureMap.put(HA_GROUP_B, futureB);
+ schedulerMap.put(HA_GROUP_A, schedulerA);
+ schedulerMap.put(HA_GROUP_B, schedulerB);
+
+ // Mirror the cancel-on-active path inside schedulePoller: remove the
entry for HA_GROUP_A,
+ // cancel its future, shut down its scheduler. HA_GROUP_B's entries must
be untouched.
+ ScheduledFuture<?> removedFuture = futureMap.remove(HA_GROUP_A);
+ assertNotNull(removedFuture);
+ removedFuture.cancel(false);
+ ScheduledExecutorService removedScheduler =
schedulerMap.remove(HA_GROUP_A);
+ assertNotNull(removedScheduler);
+ removedScheduler.shutdown();
+
+ verify(futureA, times(1)).cancel(false);
+ verify(futureB, never()).cancel(false);
+ verify(schedulerA, times(1)).shutdown();
+ verify(schedulerB, never()).shutdown();
+ assertNull("HA_GROUP_A entry should be removed",
futureMap.get(HA_GROUP_A));
+ assertNotNull("HA_GROUP_B entry should remain", futureMap.get(HA_GROUP_B));
+ assertEquals(1, futureMap.size());
+ assertEquals(1, schedulerMap.size());
+ }
+}