This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-6164
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c88b7f127b24d892f30cd262acb78625c69381eb
Author: zhouxh <[email protected]>
AuthorDate: Thu Dec 6 23:24:09 2018 -0800

    GEODE-6164: CacheClientProxy's closeSocket should be called atomically
---
 .../cache/tier/sockets/CacheClientProxy.java       | 16 ++--
 .../cache/tier/sockets/CacheClientProxyTest.java   | 96 ++++++++++++++++++++++
 2 files changed, 107 insertions(+), 5 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 7e6fcfd..58f2dd2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -945,17 +945,23 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
-  private void closeSocket() {
-    if (this._socketClosed.compareAndSet(false, true)) {
+  private boolean closeSocket() {
+    String remoteHostAddress = this._remoteHostAddress;
+    if (this._socketClosed.compareAndSet(false, true) && remoteHostAddress != 
null) {
       // Close the socket
-      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, 
this._remoteHostAddress,
-          null);
+      this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, 
remoteHostAddress, null);
       getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
+      return true;
     }
+    return false;
   }
 
   private void closeTransientFields() {
-    closeSocket();
+    if (!closeSocket()) {
+      // other thread who closed the socket will be responsible to
+      // releaseResourcesForAddress and clearClientInterestList
+      return;
+    }
 
     // Null out comm buffer, host address, ports and proxy id. All will be
     // replaced when the client reconnects.
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
new file mode 100644
index 0000000..c0684e0
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.junit.Assert.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.net.SocketCloser;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.ServerStarterRule;
+
+public class CacheClientProxyTest {
+
+  @Rule
+  public ServerStarterRule serverRule =
+      new ServerStarterRule().withLogFile().withAutoStart();
+
+  @Rule
+  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+  @Test
+  public void closeSocketShouldBeAtomic() {
+
+    CacheServerStats stats = mock(CacheServerStats.class);
+    doNothing().when(stats).incCurrentQueueConnections();
+    InternalCache cache = serverRule.getCache();
+    CacheClientNotifier ccn = mock(CacheClientNotifier.class);
+    SocketCloser sc = mock(SocketCloser.class);
+    when(ccn.getCache()).thenReturn(cache);
+    when(ccn.getAcceptorStats()).thenReturn(stats);
+    when(ccn.getSocketCloser()).thenReturn(sc);
+    Socket socket = mock(Socket.class);
+    InetAddress address = mock(InetAddress.class);
+    when(socket.getInetAddress()).thenReturn(address);
+    when(address.getHostAddress()).thenReturn("localhost");
+    doNothing().when(sc).asyncClose(any(), eq("localhost"), eq(null));
+
+    ClientProxyMembershipID proxyID = mock(ClientProxyMembershipID.class);
+    DistributedMember member = 
cache.getDistributedSystem().getDistributedMember();
+    when(proxyID.getDistributedMember()).thenReturn(member);
+    CacheClientProxy proxy = new CacheClientProxy(ccn, socket, proxyID, true,
+        Handshake.CONFLATION_DEFAULT, Version.CURRENT, 1L, true,
+        null, null);
+    Future<Void> result1 = executorServiceRule.runAsync(() -> proxy.close());
+    Future<Void> result2 = executorServiceRule.runAsync(() -> proxy.close());
+    Future<Void> result3 = executorServiceRule.runAsync(() -> proxy.close());
+    ((CompletableFuture<Void>) result1).join();
+    ((CompletableFuture<Void>) result2).join();
+    ((CompletableFuture<Void>) result3).join();
+    assertThatCode(() -> result1.get(2, SECONDS)).doesNotThrowAnyException();
+    assertThatCode(() -> result2.get(2, SECONDS)).doesNotThrowAnyException();
+    assertThatCode(() -> result3.get(2, SECONDS)).doesNotThrowAnyException();
+    verify(ccn, times(2)).getSocketCloser();
+    assertNull(proxy._remoteHostAddress);
+  }
+
+  @Test
+  public void closeSocket1000Times() {
+    // run it for 1000 times to introduce conflicts between threads
+    for (int i = 0; i < 1000; i++) {
+      closeSocketShouldBeAtomic();
+    }
+  }
+
+}

Reply via email to