This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6326e7a  GEODE-9573: Uses mutable list for connections.
6326e7a is described below

commit 6326e7a67d48e9941ef2b2cbd578a1111224002f
Author: Jacob Barrett <[email protected]>
AuthorDate: Wed Sep 1 14:33:25 2021 -0700

    GEODE-9573: Uses mutable list for connections.
    
    Updates API with null annotations.
    Updates Javadocs with comments on mutable collections.
    New test to assert that DirectReplySender always uses a mutable connection 
list.
---
 .../internal/ClusterDistributionManager.java       |  5 ++-
 .../distributed/internal/DistributionManager.java  |  6 ++-
 .../internal/LonerDistributionManager.java         |  6 ++-
 .../geode/distributed/internal/ReplySender.java    | 11 +++++-
 .../geode/internal/tcp/DirectReplySender.java      | 46 ++++++++++++++--------
 .../geode/internal/tcp/DirectReplySenderTest.java  | 43 ++++++++++++++++++++
 6 files changed, 96 insertions(+), 21 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 9e47a40..237988b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -39,6 +39,8 @@ import java.util.concurrent.Semaphore;
 import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
@@ -1076,7 +1078,8 @@ public class ClusterDistributionManager implements 
DistributionManager {
   }
 
   @Override
-  public Set<InternalDistributedMember> putOutgoing(final DistributionMessage 
msg) {
+  @Nullable
+  public Set<InternalDistributedMember> putOutgoing(final @NotNull 
DistributionMessage msg) {
     try {
       DistributionMessageObserver observer = 
DistributionMessageObserver.getInstance();
       if (observer != null) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index d544729..696f729 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -20,6 +20,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.admin.GemFireHealthConfig;
 import org.apache.geode.alerting.internal.api.AlertingService;
@@ -173,7 +176,8 @@ public interface DistributionManager extends ReplySender {
    * @return recipients who did not receive the message
    */
   @Override
-  Set<InternalDistributedMember> putOutgoing(DistributionMessage msg);
+  @Nullable
+  Set<InternalDistributedMember> putOutgoing(@NotNull DistributionMessage msg);
 
   /**
    * Returns the distributed system to which this distribution manager is 
connected.
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index 6e4d9fc..414cd8d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -27,6 +27,9 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.admin.GemFireHealthConfig;
@@ -1098,7 +1101,8 @@ public class LonerDistributionManager implements 
DistributionManager {
   }
 
   @Override
-  public Set<InternalDistributedMember> putOutgoing(DistributionMessage msg) {
+  @Nullable
+  public Set<InternalDistributedMember> putOutgoing(@NotNull 
DistributionMessage msg) {
     return null;
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
index a245164..5fbefe1 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ReplySender.java
@@ -16,6 +16,9 @@ package org.apache.geode.distributed.internal;
 
 import java.util.Set;
 
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.DirectReplyMessage;
 
@@ -31,6 +34,12 @@ import org.apache.geode.internal.cache.DirectReplyMessage;
  */
 public interface ReplySender {
 
-  Set<InternalDistributedMember> putOutgoing(DistributionMessage msg);
+
+  /**
+   * @param message to send.
+   * @return recipients who did not receive the message. May return an 
immutable set.
+   */
+  @Nullable
+  Set<InternalDistributedMember> putOutgoing(@NotNull DistributionMessage 
message);
 
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index e5b232e..b0ba75a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -14,14 +14,18 @@
  */
 package org.apache.geode.internal.tcp;
 
-import static java.util.Collections.singletonList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+import static org.apache.geode.internal.Assert.assertTrue;
 
 import java.io.IOException;
 import java.io.NotSerializableException;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
 
 import org.apache.geode.InternalGemFireException;
 import org.apache.geode.annotations.Immutable;
@@ -30,13 +34,11 @@ import 
org.apache.geode.distributed.internal.DistributionMessage;
 import 
org.apache.geode.distributed.internal.LonerDistributionManager.DummyDMStats;
 import org.apache.geode.distributed.internal.ReplySender;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * A reply sender which replies back directly to a dedicated socket socket.
- *
  */
 class DirectReplySender implements ReplySender {
   private static final Logger logger = LogService.getLogger();
@@ -44,18 +46,18 @@ class DirectReplySender implements ReplySender {
   @Immutable
   private static final DMStats DUMMY_STATS = new DummyDMStats();
 
-  private final Connection connection;
+  private final @NotNull Connection connection;
+
   private boolean sentReply = false;
 
-  public DirectReplySender(Connection connection) {
+  public DirectReplySender(@NotNull Connection connection) {
     this.connection = connection;
   }
 
   @Override
-  public Set<InternalDistributedMember> putOutgoing(DistributionMessage msg) {
-    Assert.assertTrue(!sentReply, "Trying to reply twice to a message");
-    // Using an ArrayList, rather than Collections.singletonList here, because 
the MsgStreamer
-    // mutates the list when it has exceptions.
+  @NotNull
+  public Set<InternalDistributedMember> putOutgoing(@NotNull final 
DistributionMessage msg) {
+    assertTrue(!sentReply, "Trying to reply twice to a message");
 
     
connection.getConduit().getDM().getCancelCriterion().checkCancelInProgress(null);
 
@@ -63,19 +65,19 @@ class DirectReplySender implements ReplySender {
       logger.trace(LogMarker.DM_VERBOSE, "Sending a direct reply {} to {}", 
msg,
           connection.getRemoteAddress());
     }
-    MsgStreamer ms = (MsgStreamer) 
MsgStreamer.create(singletonList(connection), msg, false,
+    final MsgStreamer ms = (MsgStreamer) MsgStreamer.create(getConnections(), 
msg, false,
         DUMMY_STATS, connection.getBufferPool());
     try {
       ms.writeMessage();
-      ConnectExceptions ce = ms.getConnectExceptions();
+      final ConnectExceptions ce = ms.getConnectExceptions();
       if (ce != null && !ce.getMembers().isEmpty()) {
-        Assert.assertTrue(ce.getMembers().size() == 1);
-        logger.warn("Failed sending a direct reply to {}",
-            ce.getMembers().iterator().next());
-        return Collections.singleton(ce.getMembers().iterator().next());
+        assertTrue(ce.getMembers().size() == 1);
+        final InternalDistributedMember member = ce.getMembers().get(0);
+        logger.warn("Failed sending a direct reply to {}", member);
+        return singleton(member);
       }
       sentReply = true;
-      return Collections.emptySet();
+      return emptySet();
     } catch (NotSerializableException e) {
       throw new InternalGemFireException(e);
     } catch (IOException ex) {
@@ -91,4 +93,14 @@ class DirectReplySender implements ReplySender {
 
   }
 
+  /**
+   * @return a mutable {@link List} for mutation by {@link MsgStreamer} upon 
exception.
+   */
+  @NotNull
+  List<Connection> getConnections() {
+    final ArrayList<Connection> connections = new ArrayList<>(1);
+    connections.add(connection);
+    return connections;
+  }
+
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/tcp/DirectReplySenderTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/DirectReplySenderTest.java
new file mode 100644
index 0000000..284be09
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/DirectReplySenderTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.mockito.Mockito.mock;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Test;
+
+public class DirectReplySenderTest {
+
+  @Test
+  public void getConnectionsReturnsMutableListOfOne() {
+    final Connection connection = mock(Connection.class);
+    final DirectReplySender directReplySender = new 
DirectReplySender(connection);
+    final List<Connection> connections = directReplySender.getConnections();
+    assertThat(connections).containsExactly(connection);
+    assertThatNoException().isThrownBy(() -> {
+      final Iterator<Connection> iterator = connections.iterator();
+      iterator.next();
+      iterator.remove();
+    });
+    assertThat(connections).isEmpty();
+  }
+
+}

Reply via email to