This is an automated email from the ASF dual-hosted git repository. boglesby pushed a commit to branch feature/GEODE-7284 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7f1ca0c85192b46919f40f9d6748e3d7922d7c8b Author: Barry Oglesby <[email protected]> AuthorDate: Wed Oct 9 17:15:28 2019 -0700 GEODE-7284: Modified CacheClientProxy remoteHostAddress to be unique --- ...ientServerRemoteHostAddressDistributedTest.java | 117 +++++++++++++++++++++ .../cache/tier/sockets/CacheClientNotifier.java | 2 + .../cache/tier/sockets/CacheClientProxy.java | 4 +- .../apache/geode/internal/net/SocketCloser.java | 3 +- 4 files changed, 123 insertions(+), 3 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerRemoteHostAddressDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerRemoteHostAddressDistributedTest.java new file mode 100644 index 0000000..4f5c59a --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerRemoteHostAddressDistributedTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.tier.sockets; + +import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; + +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.internal.cache.InternalCacheServer; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; + +public class ClientServerRemoteHostAddressDistributedTest implements Serializable { + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(); + + @Rule + public SerializableTestName testName = new SerializableTestName(); + + @Test + public void testRemoteHostAddress() throws Exception { + // Start Locator + MemberVM locator = cluster.startLocatorVM(0); + + // Start server + int locatorPort = locator.getPort(); + String regionName = testName.getMethodName() + "_region"; + MemberVM server = cluster.startServerVM(1, s -> s.withConnectionToLocator(locatorPort) + .withRegion(RegionShortcut.PARTITION, regionName)); + + // Connect client 1 + ClientVM client1 = + cluster.startClientVM(2, getDurableClientProperties(testName.getMethodName() + "_1"), + (ccf) -> { + ccf.setPoolSubscriptionEnabled(true); + ccf.addPoolLocator("localhost", locatorPort); + }); + + // Connect client 2 + ClientVM client2 = + cluster.startClientVM(3, getDurableClientProperties(testName.getMethodName() + "_2"), + (ccf) -> { + ccf.setPoolSubscriptionEnabled(true); + ccf.addPoolLocator("localhost", locatorPort); + }); + + // Invoke readyForEvents in both clients + client1.invoke(() -> readyForEvents()); + client2.invoke(() -> readyForEvents()); + + // Verify CacheClientProxies have different remoteHostAddresses + server.invoke(() -> verifyRemoteHostAddresses()); + } + + protected Properties getDurableClientProperties(String durableClientId) { + Properties properties = new Properties(); + properties.setProperty(DURABLE_CLIENT_ID, durableClientId); + return properties; + } + + private void readyForEvents() { + ClusterStartupRule.clientCacheRule.getCache().readyForEvents(); + } + + private void verifyRemoteHostAddresses() { + verifyRemoteHostAddresses(getAcceptor(), 2); + } + + private AcceptorImpl getAcceptor() { + Cache cache = ClusterStartupRule.getCache(); + List<CacheServer> cacheServers = cache.getCacheServers(); + CacheServer cacheServer = cacheServers.get(0); + return (AcceptorImpl) ((InternalCacheServer) cacheServer).getAcceptor(); + } + + private void verifyRemoteHostAddresses(AcceptorImpl acceptor, int expectedNumProxies) { + // Wait for the expected number of CacheClientProxies to be created which happens asynchronously + await().until( + () -> acceptor.getCacheClientNotifier().getClientProxies().size() == expectedNumProxies); + + // Get their remoteHostAddresses + Collection<CacheClientProxy> proxies = acceptor.getCacheClientNotifier().getClientProxies(); + Set<String> remoteHostAddresses = + proxies.stream().map(proxy -> proxy.getRemoteHostAddress()).collect(Collectors.toSet()); + + // Verify the expected number of remoteHostAddresses + assertThat(remoteHostAddresses.size()).isEqualTo(expectedNumProxies); + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java index 6ec4499..7395ea7 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java @@ -1564,6 +1564,8 @@ public class CacheClientNotifier { } catch (CancelException e) { throw e; } catch (Exception e) { + logger.warn("CacheClientNotifier: Caught exception attempting to close client: {}", proxy, + e); } // Remove the proxy if necessary. It might not be necessary to remove the proxy if it is diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java index 945f614..86529ad 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java @@ -345,7 +345,7 @@ public class CacheClientProxy implements ClientSession { this.statisticsClock = statisticsClock; this._statistics = new CacheClientProxyStats(factory, "id_" + this.proxyID.getDistributedMember().getId() - + "_at_" + this._remoteHostAddress + ":" + this._socket.getPort()); + + "_at_" + this._remoteHostAddress); this.subject = subject; // Create the interest list @@ -418,7 +418,7 @@ public class CacheClientProxy implements ClientSession { } this._commBuffer = ServerConnection.allocateCommBuffer(bufSize, socket); } - this._remoteHostAddress = socket.getInetAddress().getHostAddress(); + this._remoteHostAddress = socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); this.isPrimary = ip; this.clientConflation = cc; this.clientVersion = vers; diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java index f083d50..4727c57 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java @@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; @@ -205,7 +206,7 @@ public class SocketCloser { if (submittedTask != null) { waitForFutureTaskWithTimeout(submittedTask); } - } catch (OutOfMemoryError ignore) { + } catch (RejectedExecutionException | OutOfMemoryError ignore) { // If we can't start a thread to close the socket just do it inline. // See bug 50573. doItInline = true;
