This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new ae17ba4adc GEODE-10020: Introduction of option to gradually activate
pinging (#7517)
ae17ba4adc is described below
commit ae17ba4adce09e51f91d8bb3813beeed8cbf5569
Author: Mario Ivanac <[email protected]>
AuthorDate: Wed Apr 27 22:02:44 2022 +0200
GEODE-10020: Introduction of option to gradually activate pinging (#7517)
* GEODE-10020: Introduction of option to gradually activate pinging toward
destination
---
...iversWithSamePortAndHostnameForSendersTest.java | 61 ++++++++++++++++-
.../cache/client/internal/LiveServerPinger.java | 41 +++++++++++-
.../client/internal/LiveServerPingerTest.java | 78 ++++++++++++++++++++++
3 files changed, 177 insertions(+), 3 deletions(-)
diff --git
a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
index 8bad48f570..682b132638 100644
---
a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
+++
b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
@@ -58,6 +58,7 @@ import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.DistributedRule;
import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.util.internal.GeodeGlossary;
/**
@@ -214,6 +215,54 @@ public class
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
}
+
+ /**
+ * The aim of this test is verify that when several gateway receivers in a
remote site share the
+ * same port and hostname-for-senders, the pings sent from the gateway
senders reach the right
+ * gateway receiver and not just any of the receivers. Check that only one
additional connection
+ * is used.
+ */
+ @Test
+ public void
testPingsToReceiversWithSamePortAndHostnameForSendersUseOnlyOneMoreConnection()
+ throws InterruptedException {
+ String senderId = "ln";
+ String regionName = "region-wan";
+ final int remoteLocPort = docker.getExternalPortForService("haproxy",
20334);
+
+ int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+ VM vm1 = VM.getVM(1);
+
+ vm1.invoke(() -> {
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX
+ + "LiveServerPinger.INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS",
+ "500");
+
+ Properties props = new Properties();
+ props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+ CacheFactory cacheFactory = new CacheFactory(props);
+ cache = cacheFactory.create();
+ });
+
+ createGatewaySender(vm1, senderId, 2, true, 5,
+ 2, GatewaySender.DEFAULT_ORDER_POLICY);
+
+ createPartitionedRegion(vm1, regionName, senderId, 0, 10);
+
+ int NUM_PUTS = 10;
+
+ putKeyValues(vm1, NUM_PUTS, regionName);
+
+ await().untilAsserted(() -> assertThat(getQueuedEvents(vm1,
senderId)).isEqualTo(0));
+
+ await().untilAsserted(() -> assertThat(getSenderPoolDisconnects(vm1,
senderId)).isEqualTo(0));
+
+ await().untilAsserted(() -> assertThat(getSenderPoolConnects(vm1,
senderId)).isEqualTo(4));
+ }
+
+
+
private boolean allDispatchersConnectedToSameReceiver(int server) {
String gfshOutput = runListGatewayReceiversCommandInServer(server);
@@ -351,12 +400,22 @@ public class
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
return vm.invoke(() -> {
AbstractGatewaySender sender =
(AbstractGatewaySender)
CacheFactory.getAnyInstance().getGatewaySender(senderId);
- assertNotNull(sender);
+ assertThat(sender).isNotNull();
PoolStats poolStats = sender.getProxy().getStats();
return poolStats.getDisConnects();
});
}
+ private static int getSenderPoolConnects(VM vm, String senderId) {
+ return vm.invoke(() -> {
+ AbstractGatewaySender sender =
+ (AbstractGatewaySender)
CacheFactory.getAnyInstance().getGatewaySender(senderId);
+ assertThat(sender).isNotNull();
+ PoolStats poolStats = sender.getProxy().getStats();
+ return poolStats.getConnects();
+ });
+ }
+
private static void putKeyValues(VM vm, int numPuts, String region) {
final HashMap<Integer, Integer> keyValues = new HashMap<>();
for (int i = 0; i < numPuts; i++) {
diff --git
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java
index ff91da5a87..fd00a4a9be 100644
---
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java
+++
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java
@@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Logger;
@@ -27,6 +28,7 @@ import
org.apache.geode.cache.client.internal.PoolImpl.PoolTask;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.util.internal.GeodeGlossary;
/**
* Responsible for pinging live servers to make sure they are still alive.
@@ -41,26 +43,48 @@ public class LiveServerPinger extends
EndpointListenerAdapter {
protected final InternalPool pool;
protected final long pingIntervalNanos;
+ /**
+ * Initial delay offset time between LiveServerPinger tasks. Time in
milliseconds.
+ */
+ public static final int INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS =
+ Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX
+ + "LiveServerPinger.INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS", 0);
+
+ private final AtomicInteger initialDelayIndex = new AtomicInteger(0);
+
+
public LiveServerPinger(InternalPool pool) {
this.pool = pool;
- pingIntervalNanos = ((pool.getPingInterval() + 1) / 2) * NANOS_PER_MS;
+ pingIntervalNanos = TimeUnit.MILLISECONDS.toNanos((pool.getPingInterval()
+ 1) / 2);
}
@Override
public void endpointCrashed(Endpoint endpoint) {
+ resetInitialDelay();
cancelFuture(endpoint);
}
@Override
public void endpointNoLongerInUse(Endpoint endpoint) {
+ resetInitialDelay();
cancelFuture(endpoint);
}
+ /**
+ * At each registration of new endpoint increase counter for calculation of
initial delay offset
+ *
+ */
@Override
public void endpointNowInUse(Endpoint endpoint) {
try {
+ long initDelay = calculateInitialDelay();
+
+ // initDelay - the time to delay first execution
+ // pingIntervalNanos - the delay between the termination of one
execution and the commencement
+ // of the next
+
Future future = pool.getBackgroundProcessor().scheduleWithFixedDelay(new
PingTask(endpoint),
- pingIntervalNanos, pingIntervalNanos, TimeUnit.NANOSECONDS);
+ initDelay, pingIntervalNanos, TimeUnit.NANOSECONDS);
taskFutures.put(endpoint, future);
} catch (RejectedExecutionException e) {
if (!pool.getCancelCriterion().isCancelInProgress()) {
@@ -76,6 +100,19 @@ public class LiveServerPinger extends
EndpointListenerAdapter {
}
}
+
+ long calculateInitialDelay() {
+ long initDelay = initialDelayIndex.getAndIncrement();
+ initDelay =
+ TimeUnit.MILLISECONDS.toNanos(initDelay *
INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS)
+ + pingIntervalNanos;
+ return initDelay;
+ }
+
+ void resetInitialDelay() {
+ initialDelayIndex.set(0);
+ }
+
private class PingTask extends PoolTask {
private final Endpoint endpoint;
diff --git
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/LiveServerPingerTest.java
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/LiveServerPingerTest.java
new file mode 100644
index 0000000000..15f4ca5ebd
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/LiveServerPingerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.geode.util.internal.GeodeGlossary;
+
+public class LiveServerPingerTest {
+
+ private InternalPool pool;
+
+ private LiveServerPinger lsp;
+
+ private static long PING_INTERVAL = 10L;
+ private static long DEFAULT_PING_INTERVAL_NANOS = 5000000L;
+
+ private static long CONFIG_PING_INTERVAL_NANOS = 1000000L;
+
+
+ @BeforeEach
+ public void init() throws Exception {
+ System.setProperty(
+ GeodeGlossary.GEMFIRE_PREFIX +
"LiveServerPinger.INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS",
+ "1");
+
+ pool = mock(InternalPool.class);
+ when(pool.getPingInterval()).thenReturn(PING_INTERVAL);
+
+ lsp = new LiveServerPinger(pool);
+ }
+
+ @Test
+ public void testInitialDelay() throws Exception {
+
+
assertThat(lsp.calculateInitialDelay()).isEqualTo(DEFAULT_PING_INTERVAL_NANOS);
+ assertThat(lsp.calculateInitialDelay())
+ .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + CONFIG_PING_INTERVAL_NANOS);
+ assertThat(lsp.calculateInitialDelay())
+ .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + (2 *
CONFIG_PING_INTERVAL_NANOS));
+ assertThat(lsp.calculateInitialDelay())
+ .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + (3 *
CONFIG_PING_INTERVAL_NANOS));
+
+ }
+
+ @Test
+ public void testInitialDelayWithReset() throws Exception {
+
+
assertThat(lsp.calculateInitialDelay()).isEqualTo(DEFAULT_PING_INTERVAL_NANOS);
+ assertThat(lsp.calculateInitialDelay())
+ .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + CONFIG_PING_INTERVAL_NANOS);
+ assertThat(lsp.calculateInitialDelay())
+ .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + (2 *
CONFIG_PING_INTERVAL_NANOS));
+ lsp.resetInitialDelay();
+ assertThat(lsp.calculateInitialDelay())
+ .isEqualTo(DEFAULT_PING_INTERVAL_NANOS);
+
+ }
+
+}