This is an automated email from the ASF dual-hosted git repository.
mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 7d7a98b103 GEODE-10331: schedule delayed CloseEndpoint (#7849)
7d7a98b103 is described below
commit 7d7a98b10355cb25985c031bfd2a67c77f1b6e43
Author: Mario Ivanac <[email protected]>
AuthorDate: Mon Sep 19 08:17:42 2022 +0200
GEODE-10331: schedule delayed CloseEndpoint (#7849)
* GEODE-10331: schedule delayed CloseEndpoint
* GEODE-10331: added TCs
---
.../distributed/internal/DistributionImpl.java | 20 +----
.../distributed/internal/direct/DirectChannel.java | 46 ++++++++++
.../distributed/internal/DistributionTest.java | 31 ++++---
.../internal/direct/DirectChannelTest.java | 100 +++++++++++++++++++++
4 files changed, 168 insertions(+), 29 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
index fcba8e4c3e..68059bb898 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionImpl.java
@@ -74,7 +74,6 @@ import
org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.tcp.ConnectExceptions;
import org.apache.geode.internal.tcp.ConnectionException;
import org.apache.geode.internal.util.Breadcrumbs;
-import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.security.AuthenticationRequiredException;
import org.apache.geode.security.GemFireSecurityException;
@@ -648,28 +647,13 @@ public class DistributionImpl implements Distribution {
}
}
- private void destroyMember(final InternalDistributedMember member, final
String reason) {
+ void destroyMember(final InternalDistributedMember member, final String
reason) {
final DirectChannel dc = directChannel;
if (dc != null) {
// Bug 37944: make sure this is always done in a separate thread,
// so that shutdown conditions don't wedge the view lock
// fix for bug 34010
- new LoggingThread("disconnect thread for " + member, () -> {
- try {
- Thread.sleep(Integer.getInteger("p2p.disconnectDelay", 3000));
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- // Keep going, try to close the endpoint.
- }
- if (!dc.isOpen()) {
- return;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Membership: closing connections for departed member
{}", member);
- }
- // close connections, but don't do membership notification since it's
already been done
- dc.closeEndpoint(member, reason, false);
- }).start();
+ dc.scheduleCloseEndpoint(member, reason, false);
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index eaac79f2b8..55108741a5 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -24,6 +24,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
@@ -56,6 +59,7 @@ import org.apache.geode.internal.tcp.ConnectionException;
import org.apache.geode.internal.tcp.MsgStreamer;
import org.apache.geode.internal.tcp.TCPConduit;
import org.apache.geode.internal.util.Breadcrumbs;
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
@@ -86,6 +90,11 @@ public class DirectChannel {
InternalDistributedMember localAddr;
+ private ScheduledExecutorService closeEndpointExecutor;
+
+ private final int CLOSE_ENDPOINT_POOL_SIZE =
+ Integer.getInteger("DirectChannel.CLOSE_ENDPOINT_POOL_SIZE", 1);
+
/**
* Callback to set the local address, must be done before this channel is
used.
*
@@ -147,6 +156,9 @@ public class DirectChannel {
logger.info("GemFire P2P Listener started on {}",
conduit.getSocketId());
+ closeEndpointExecutor =
LoggingExecutors.newScheduledThreadPool(CLOSE_ENDPOINT_POOL_SIZE,
+ "DirectChannel.closeEndpoint", false);
+
} catch (ConnectionException ce) {
logger.fatal(String.format("Unable to initialize direct channel because:
%s",
ce.getMessage()),
@@ -667,6 +679,7 @@ public class DirectChannel {
public synchronized void disconnect(Exception cause) {
disconnected = true;
disconnectCompleted = false;
+ closeEndpointExecutor.shutdownNow();
conduit.stop(cause);
disconnectCompleted = true;
}
@@ -765,4 +778,37 @@ public class DirectChannel {
public boolean hasReceiversFor(DistributedMember mbr) {
return conduit.hasReceiversFor(mbr);
}
+
+ public void scheduleCloseEndpoint(InternalDistributedMember member, String
reason,
+ boolean notifyDisconnect) {
+ if (disconnected) {
+ return;
+ }
+ closeEndpointExecutor.schedule(new CloseEndpointRunnable(member, reason,
notifyDisconnect),
+ Integer.getInteger("p2p.disconnectDelay", 3000),
TimeUnit.MILLISECONDS);
+ }
+
+ int getCloseEndpointExecutorQueueSize() {
+ ScheduledThreadPoolExecutor implementation =
+ (ScheduledThreadPoolExecutor) closeEndpointExecutor;
+ return implementation.getQueue().size();
+ }
+
+ public class CloseEndpointRunnable implements Runnable {
+
+ protected final InternalDistributedMember member;
+ protected final String reason;
+ protected final boolean notifyDisconnect;
+
+ public CloseEndpointRunnable(InternalDistributedMember member, String
reason, boolean notify) {
+ this.member = member;
+ this.reason = reason;
+ this.notifyDisconnect = notify;
+ }
+
+ @Override
+ public void run() {
+ closeEndpoint(member, reason, notifyDisconnect);
+ }
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java
b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java
index 029a5b99b8..f3844ce47b 100644
---
a/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java
+++
b/geode-core/src/test/java/org/apache/geode/distributed/internal/DistributionTest.java
@@ -15,13 +15,13 @@
package org.apache.geode.distributed.internal;
import static
org.apache.geode.distributed.internal.DistributionImpl.EMPTY_MEMBER_ARRAY;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@@ -109,7 +109,7 @@ public class DistributionTest {
m.setRecipients(recipients);
Set<InternalDistributedMember> failures = distribution
.directChannelSend(recipients, m);
- assertTrue(failures == null);
+ assertThat(failures == null).isTrue();
verify(dc).send(any(), any(),
any(), anyLong(), anyLong());
}
@@ -126,9 +126,9 @@ public class DistributionTest {
when(dc.send(any(), any(mockMembers.getClass()),
any(DistributionMessage.class), anyLong(),
anyLong())).thenThrow(exception);
failures = distribution.directChannelSend(recipients, m);
- assertTrue(failures != null);
- assertEquals(1, failures.size());
- assertEquals(recipients.get(0), failures.iterator().next());
+ assertThat(failures != null).isTrue();
+ assertThat(failures).hasSize(1);
+ assertThat(failures.iterator().next()).isEqualTo(recipients.get(0));
}
@Test
@@ -154,10 +154,10 @@ public class DistributionTest {
HighPriorityAckedMessage m = new HighPriorityAckedMessage();
when(membership.getAllMembers(EMPTY_MEMBER_ARRAY)).thenReturn(mockMembers);
m.setRecipient(DistributionMessage.ALL_RECIPIENTS);
- assertTrue(m.forAll());
+ assertThat(m.forAll()).isTrue();
Set<InternalDistributedMember> failures = distribution
.directChannelSend(null, m);
- assertTrue(failures == null);
+ assertThat(failures == null).isTrue();
verify(dc).send(any(), isA(mockMembers.getClass()),
isA(DistributionMessage.class), anyLong(), anyLong());
}
@@ -188,8 +188,8 @@ public class DistributionTest {
Set<InternalDistributedMember> failures =
distribution.send(Collections.singletonList(mockMembers[0]), m);
verify(membership, never()).send(any(), any());
- assertEquals(1, failures.size());
- assertEquals(mockMembers[0], failures.iterator().next());
+ assertThat(failures).hasSize(1);
+ assertThat(failures.iterator().next()).isEqualTo(mockMembers[0]);
}
@Test
@@ -228,4 +228,13 @@ public class DistributionTest {
.isInstanceOf(SystemConnectException.class)
.hasCause(exception);
}
+
+ @Test
+ public void testMemberDestroyed() throws Exception {
+ distribution.destroyMember(mockMembers[0], null);
+ distribution.destroyMember(mockMembers[1], null);
+
+ verify(dc).scheduleCloseEndpoint(eq(mockMembers[0]), eq(null), eq(false));
+ verify(dc).scheduleCloseEndpoint(eq(mockMembers[1]), eq(null), eq(false));
+ }
}
diff --git
a/geode-core/src/test/java/org/apache/geode/distributed/internal/direct/DirectChannelTest.java
b/geode-core/src/test/java/org/apache/geode/distributed/internal/direct/DirectChannelTest.java
new file mode 100644
index 0000000000..cb197a8c95
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/distributed/internal/direct/DirectChannelTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.direct;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Random;
+
+import org.jgroups.util.UUID;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.distributed.internal.membership.api.Membership;
+import org.apache.geode.distributed.internal.membership.api.MessageListener;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+
+public class DirectChannelTest {
+
+
+ private DirectChannel directChannel;
+ private InternalDistributedMember[] mockMembers;
+
+ Membership<InternalDistributedMember> mgr;
+ MessageListener<InternalDistributedMember> listener;
+ ClusterDistributionManager dm;
+
+ DistributionConfig dc;
+
+ /**
+ * Some tests require a DirectChannel mock
+ */
+ @Before
+ public void setUp() throws Exception {
+ listener = mock(MessageListener.class);
+ mgr = mock(Membership.class);
+ dm = mock(ClusterDistributionManager.class);
+ dc = mock(DistributionConfig.class);
+
+ Random r = new Random();
+ mockMembers = new InternalDistributedMember[5];
+ for (int i = 0; i < mockMembers.length; i++) {
+ mockMembers[i] = new InternalDistributedMember("localhost", 8888 + i);
+ UUID uuid = new UUID(r.nextLong(), r.nextLong());
+ mockMembers[i].setUUID(uuid);
+ }
+ when(dm.getConfig()).thenReturn(dc);
+ when(dm.getSystem()).thenReturn(mock(InternalDistributedSystem.class));
+
+ int[] range = new int[2];
+ range[0] = 41000;
+ range[1] = 61000;
+ when(dc.getMembershipPortRange()).thenReturn(range);
+ SecurableCommunicationChannel[] sslEnabledComponent = new
SecurableCommunicationChannel[1];
+ sslEnabledComponent[0] = SecurableCommunicationChannel.CLUSTER;
+
+
when(dc.getSecurableCommunicationChannels()).thenReturn(sslEnabledComponent);
+
+ SocketCreatorFactory.setDistributionConfig(dc);
+ directChannel = new DirectChannel(mgr, listener, dm);
+ }
+
+ @Test
+ public void testScheduleCloseEndpoint() throws Exception {
+ directChannel.scheduleCloseEndpoint(mockMembers[0], null, false);
+ directChannel.scheduleCloseEndpoint(mockMembers[1], null, false);
+ directChannel.scheduleCloseEndpoint(mockMembers[2], null, false);
+
+ assertThat(directChannel.getCloseEndpointExecutorQueueSize()).isEqualTo(3);
+ }
+
+ @Test
+ public void testScheduleCloseEndpointAndClearAllAtDisconnect() throws
Exception {
+ directChannel.scheduleCloseEndpoint(mockMembers[0], null, false);
+ directChannel.scheduleCloseEndpoint(mockMembers[1], null, false);
+ directChannel.scheduleCloseEndpoint(mockMembers[2], null, false);
+ directChannel.disconnect(null);
+
+ assertThat(directChannel.getCloseEndpointExecutorQueueSize()).isEqualTo(0);
+ }
+}