This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-6164 in repository https://gitbox.apache.org/repos/asf/geode.git
commit c88b7f127b24d892f30cd262acb78625c69381eb Author: zhouxh <[email protected]> AuthorDate: Thu Dec 6 23:24:09 2018 -0800 GEODE-6164: CacheClientProxy's closeSocket should be called atomically --- .../cache/tier/sockets/CacheClientProxy.java | 16 ++-- .../cache/tier/sockets/CacheClientProxyTest.java | 96 ++++++++++++++++++++++ 2 files changed, 107 insertions(+), 5 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java index 7e6fcfd..58f2dd2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java @@ -945,17 +945,23 @@ public class CacheClientProxy implements ClientSession { } } - private void closeSocket() { - if (this._socketClosed.compareAndSet(false, true)) { + private boolean closeSocket() { + String remoteHostAddress = this._remoteHostAddress; + if (this._socketClosed.compareAndSet(false, true) && remoteHostAddress != null) { // Close the socket - this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress, - null); + this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, remoteHostAddress, null); getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections(); + return true; } + return false; } private void closeTransientFields() { - closeSocket(); + if (!closeSocket()) { + // other thread who closed the socket will be responsible to + // releaseResourcesForAddress and clearClientInterestList + return; + } // Null out comm buffer, host address, ports and proxy id. All will be // replaced when the client reconnects. diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java new file mode 100644 index 0000000..c0684e0 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxyTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.tier.sockets; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.junit.Assert.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.net.InetAddress; +import java.net.Socket; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; + +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.internal.Version; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.net.SocketCloser; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; +import org.apache.geode.test.junit.rules.ServerStarterRule; + +public class CacheClientProxyTest { + + @Rule + public ServerStarterRule serverRule = + new ServerStarterRule().withLogFile().withAutoStart(); + + @Rule + public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); + + @Test + public void closeSocketShouldBeAtomic() { + + CacheServerStats stats = mock(CacheServerStats.class); + doNothing().when(stats).incCurrentQueueConnections(); + InternalCache cache = serverRule.getCache(); + CacheClientNotifier ccn = mock(CacheClientNotifier.class); + SocketCloser sc = mock(SocketCloser.class); + when(ccn.getCache()).thenReturn(cache); + when(ccn.getAcceptorStats()).thenReturn(stats); + when(ccn.getSocketCloser()).thenReturn(sc); + Socket socket = mock(Socket.class); + InetAddress address = mock(InetAddress.class); + when(socket.getInetAddress()).thenReturn(address); + when(address.getHostAddress()).thenReturn("localhost"); + doNothing().when(sc).asyncClose(any(), eq("localhost"), eq(null)); + + ClientProxyMembershipID proxyID = mock(ClientProxyMembershipID.class); + DistributedMember member = cache.getDistributedSystem().getDistributedMember(); + when(proxyID.getDistributedMember()).thenReturn(member); + CacheClientProxy proxy = new CacheClientProxy(ccn, socket, proxyID, true, + Handshake.CONFLATION_DEFAULT, Version.CURRENT, 1L, true, + null, null); + Future<Void> result1 = executorServiceRule.runAsync(() -> proxy.close()); + Future<Void> result2 = executorServiceRule.runAsync(() -> proxy.close()); + Future<Void> result3 = executorServiceRule.runAsync(() -> proxy.close()); + ((CompletableFuture<Void>) result1).join(); + ((CompletableFuture<Void>) result2).join(); + ((CompletableFuture<Void>) result3).join(); + assertThatCode(() -> result1.get(2, SECONDS)).doesNotThrowAnyException(); + assertThatCode(() -> result2.get(2, SECONDS)).doesNotThrowAnyException(); + assertThatCode(() -> result3.get(2, SECONDS)).doesNotThrowAnyException(); + verify(ccn, times(2)).getSocketCloser(); + assertNull(proxy._remoteHostAddress); + } + + @Test + public void closeSocket1000Times() { + // run it for 1000 times to introduce conflicts between threads + for (int i = 0; i < 1000; i++) { + closeSocketShouldBeAtomic(); + } + } + +}
