This is an automated email from the ASF dual-hosted git repository.
jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 6326e7a GEODE-9573: Uses mutable list for connections.
6326e7a is described below
commit 6326e7a67d48e9941ef2b2cbd578a1111224002f
Author: Jacob Barrett <[email protected]>
AuthorDate: Wed Sep 1 14:33:25 2021 -0700
GEODE-9573: Uses mutable list for connections.
Updates API with null annotations.
Updates Javadocs with comments on mutable collections.
New test to assert that DirectReplySender always uses a mutable connection
list.
---
.../internal/ClusterDistributionManager.java | 5 ++-
.../distributed/internal/DistributionManager.java | 6 ++-
.../internal/LonerDistributionManager.java | 6 ++-
.../geode/distributed/internal/ReplySender.java | 11 +++++-
.../geode/internal/tcp/DirectReplySender.java | 46 ++++++++++++++--------
.../geode/internal/tcp/DirectReplySenderTest.java | 43 ++++++++++++++++++++
6 files changed, 96 insertions(+), 21 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 9e47a40..237988b 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -39,6 +39,8 @@ import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
@@ -1076,7 +1078,8 @@ public class ClusterDistributionManager implements
DistributionManager {
}
@Override
- public Set<InternalDistributedMember> putOutgoing(final DistributionMessage
msg) {
+ @Nullable
+ public Set<InternalDistributedMember> putOutgoing(final @NotNull
DistributionMessage msg) {
try {
DistributionMessageObserver observer =
DistributionMessageObserver.getInstance();
if (observer != null) {
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index d544729..696f729 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -20,6 +20,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
import org.apache.geode.CancelCriterion;
import org.apache.geode.admin.GemFireHealthConfig;
import org.apache.geode.alerting.internal.api.AlertingService;
@@ -173,7 +176,8 @@ public interface DistributionManager extends ReplySender {
* @return recipients who did not receive the message
*/
@Override
- Set<InternalDistributedMember> putOutgoing(DistributionMessage msg);
+ @Nullable
+ Set<InternalDistributedMember> putOutgoing(@NotNull DistributionMessage msg);
/**
* Returns the distributed system to which this distribution manager is
connected.
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index 6e4d9fc..414cd8d 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -27,6 +27,9 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
import org.apache.geode.CancelCriterion;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.admin.GemFireHealthConfig;
@@ -1098,7 +1101,8 @@ public class LonerDistributionManager implements
DistributionManager {
}
@Override
- public Set<InternalDistributedMember> putOutgoing(DistributionMessage msg) {
+ @Nullable
+ public Set<InternalDistributedMember> putOutgoing(@NotNull
DistributionMessage msg) {
return null;
}
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
index a245164..5fbefe1 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
@@ -16,6 +16,9 @@ package org.apache.geode.distributed.internal;
import java.util.Set;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.DirectReplyMessage;
@@ -31,6 +34,12 @@ import org.apache.geode.internal.cache.DirectReplyMessage;
*/
public interface ReplySender {
- Set<InternalDistributedMember> putOutgoing(DistributionMessage msg);
+
+ /**
+ * @param message to send.
+ * @return recipients who did not receive the message. May return an
immutable set.
+ */
+ @Nullable
+ Set<InternalDistributedMember> putOutgoing(@NotNull DistributionMessage
message);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index e5b232e..b0ba75a 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -14,14 +14,18 @@
*/
package org.apache.geode.internal.tcp;
-import static java.util.Collections.singletonList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static org.apache.geode.internal.Assert.assertTrue;
import java.io.IOException;
import java.io.NotSerializableException;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.annotations.Immutable;
@@ -30,13 +34,11 @@ import
org.apache.geode.distributed.internal.DistributionMessage;
import
org.apache.geode.distributed.internal.LonerDistributionManager.DummyDMStats;
import org.apache.geode.distributed.internal.ReplySender;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Assert;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* A reply sender which replies back directly to a dedicated socket socket.
- *
*/
class DirectReplySender implements ReplySender {
private static final Logger logger = LogService.getLogger();
@@ -44,18 +46,18 @@ class DirectReplySender implements ReplySender {
@Immutable
private static final DMStats DUMMY_STATS = new DummyDMStats();
- private final Connection connection;
+ private final @NotNull Connection connection;
+
private boolean sentReply = false;
- public DirectReplySender(Connection connection) {
+ public DirectReplySender(@NotNull Connection connection) {
this.connection = connection;
}
@Override
- public Set<InternalDistributedMember> putOutgoing(DistributionMessage msg) {
- Assert.assertTrue(!sentReply, "Trying to reply twice to a message");
- // Using an ArrayList, rather than Collections.singletonList here, because
the MsgStreamer
- // mutates the list when it has exceptions.
+ @NotNull
+ public Set<InternalDistributedMember> putOutgoing(@NotNull final
DistributionMessage msg) {
+ assertTrue(!sentReply, "Trying to reply twice to a message");
connection.getConduit().getDM().getCancelCriterion().checkCancelInProgress(null);
@@ -63,19 +65,19 @@ class DirectReplySender implements ReplySender {
logger.trace(LogMarker.DM_VERBOSE, "Sending a direct reply {} to {}",
msg,
connection.getRemoteAddress());
}
- MsgStreamer ms = (MsgStreamer)
MsgStreamer.create(singletonList(connection), msg, false,
+ final MsgStreamer ms = (MsgStreamer) MsgStreamer.create(getConnections(),
msg, false,
DUMMY_STATS, connection.getBufferPool());
try {
ms.writeMessage();
- ConnectExceptions ce = ms.getConnectExceptions();
+ final ConnectExceptions ce = ms.getConnectExceptions();
if (ce != null && !ce.getMembers().isEmpty()) {
- Assert.assertTrue(ce.getMembers().size() == 1);
- logger.warn("Failed sending a direct reply to {}",
- ce.getMembers().iterator().next());
- return Collections.singleton(ce.getMembers().iterator().next());
+ assertTrue(ce.getMembers().size() == 1);
+ final InternalDistributedMember member = ce.getMembers().get(0);
+ logger.warn("Failed sending a direct reply to {}", member);
+ return singleton(member);
}
sentReply = true;
- return Collections.emptySet();
+ return emptySet();
} catch (NotSerializableException e) {
throw new InternalGemFireException(e);
} catch (IOException ex) {
@@ -91,4 +93,14 @@ class DirectReplySender implements ReplySender {
}
+ /**
+ * @return a mutable {@link List} for mutation by {@link MsgStreamer} upon
exception.
+ */
+ @NotNull
+ List<Connection> getConnections() {
+ final ArrayList<Connection> connections = new ArrayList<>(1);
+ connections.add(connection);
+ return connections;
+ }
+
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/tcp/DirectReplySenderTest.java
b/geode-core/src/test/java/org/apache/geode/internal/tcp/DirectReplySenderTest.java
new file mode 100644
index 0000000..284be09
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/tcp/DirectReplySenderTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.mockito.Mockito.mock;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Test;
+
+public class DirectReplySenderTest {
+
+ @Test
+ public void getConnectionsReturnsMutableListOfOne() {
+ final Connection connection = mock(Connection.class);
+ final DirectReplySender directReplySender = new
DirectReplySender(connection);
+ final List<Connection> connections = directReplySender.getConnections();
+ assertThat(connections).containsExactly(connection);
+ assertThatNoException().isThrownBy(() -> {
+ final Iterator<Connection> iterator = connections.iterator();
+ iterator.next();
+ iterator.remove();
+ });
+ assertThat(connections).isEmpty();
+ }
+
+}