This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-7284
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 7f1ca0c85192b46919f40f9d6748e3d7922d7c8b
Author: Barry Oglesby <[email protected]>
AuthorDate: Wed Oct 9 17:15:28 2019 -0700

    GEODE-7284: Modified CacheClientProxy remoteHostAddress to be unique
---
 ...ientServerRemoteHostAddressDistributedTest.java | 117 +++++++++++++++++++++
 .../cache/tier/sockets/CacheClientNotifier.java    |   2 +
 .../cache/tier/sockets/CacheClientProxy.java       |   4 +-
 .../apache/geode/internal/net/SocketCloser.java    |   3 +-
 4 files changed, 123 insertions(+), 3 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerRemoteHostAddressDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerRemoteHostAddressDistributedTest.java
new file mode 100644
index 0000000..4f5c59a
--- /dev/null
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/ClientServerRemoteHostAddressDistributedTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.InternalCacheServer;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class ClientServerRemoteHostAddressDistributedTest implements 
Serializable {
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Test
+  public void testRemoteHostAddress() throws Exception {
+    // Start Locator
+    MemberVM locator = cluster.startLocatorVM(0);
+
+    // Start server
+    int locatorPort = locator.getPort();
+    String regionName = testName.getMethodName() + "_region";
+    MemberVM server = cluster.startServerVM(1, s -> 
s.withConnectionToLocator(locatorPort)
+        .withRegion(RegionShortcut.PARTITION, regionName));
+
+    // Connect client 1
+    ClientVM client1 =
+        cluster.startClientVM(2, 
getDurableClientProperties(testName.getMethodName() + "_1"),
+            (ccf) -> {
+              ccf.setPoolSubscriptionEnabled(true);
+              ccf.addPoolLocator("localhost", locatorPort);
+            });
+
+    // Connect client 2
+    ClientVM client2 =
+        cluster.startClientVM(3, 
getDurableClientProperties(testName.getMethodName() + "_2"),
+            (ccf) -> {
+              ccf.setPoolSubscriptionEnabled(true);
+              ccf.addPoolLocator("localhost", locatorPort);
+            });
+
+    // Invoke readyForEvents in both clients
+    client1.invoke(() -> readyForEvents());
+    client2.invoke(() -> readyForEvents());
+
+    // Verify CacheClientProxies have different remoteHostAddresses
+    server.invoke(() -> verifyRemoteHostAddresses());
+  }
+
+  protected Properties getDurableClientProperties(String durableClientId) {
+    Properties properties = new Properties();
+    properties.setProperty(DURABLE_CLIENT_ID, durableClientId);
+    return properties;
+  }
+
+  private void readyForEvents() {
+    ClusterStartupRule.clientCacheRule.getCache().readyForEvents();
+  }
+
+  private void verifyRemoteHostAddresses() {
+    verifyRemoteHostAddresses(getAcceptor(), 2);
+  }
+
+  private AcceptorImpl getAcceptor() {
+    Cache cache = ClusterStartupRule.getCache();
+    List<CacheServer> cacheServers = cache.getCacheServers();
+    CacheServer cacheServer = cacheServers.get(0);
+    return (AcceptorImpl) ((InternalCacheServer) cacheServer).getAcceptor();
+  }
+
+  private void verifyRemoteHostAddresses(AcceptorImpl acceptor, int 
expectedNumProxies) {
+    // Wait for the expected number of CacheClientProxies to be created which 
happens asynchronously
+    await().until(
+        () -> acceptor.getCacheClientNotifier().getClientProxies().size() == 
expectedNumProxies);
+
+    // Get their remoteHostAddresses
+    Collection<CacheClientProxy> proxies = 
acceptor.getCacheClientNotifier().getClientProxies();
+    Set<String> remoteHostAddresses =
+        proxies.stream().map(proxy -> 
proxy.getRemoteHostAddress()).collect(Collectors.toSet());
+
+    // Verify the expected number of remoteHostAddresses
+    assertThat(remoteHostAddresses.size()).isEqualTo(expectedNumProxies);
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 6ec4499..7395ea7 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -1564,6 +1564,8 @@ public class CacheClientNotifier {
       } catch (CancelException e) {
         throw e;
       } catch (Exception e) {
+        logger.warn("CacheClientNotifier: Caught exception attempting to close 
client: {}", proxy,
+            e);
       }
 
       // Remove the proxy if necessary. It might not be necessary to remove 
the proxy if it is
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 945f614..86529ad 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -345,7 +345,7 @@ public class CacheClientProxy implements ClientSession {
     this.statisticsClock = statisticsClock;
     this._statistics =
         new CacheClientProxyStats(factory, "id_" + 
this.proxyID.getDistributedMember().getId()
-            + "_at_" + this._remoteHostAddress + ":" + this._socket.getPort());
+            + "_at_" + this._remoteHostAddress);
     this.subject = subject;
 
     // Create the interest list
@@ -418,7 +418,7 @@ public class CacheClientProxy implements ClientSession {
       }
       this._commBuffer = ServerConnection.allocateCommBuffer(bufSize, socket);
     }
-    this._remoteHostAddress = socket.getInetAddress().getHostAddress();
+    this._remoteHostAddress = socket.getInetAddress().getHostAddress() + ":" + 
socket.getPort();
     this.isPrimary = ip;
     this.clientConflation = cc;
     this.clientVersion = vers;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
index f083d50..4727c57 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCloser.java
@@ -20,6 +20,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
@@ -205,7 +206,7 @@ public class SocketCloser {
       if (submittedTask != null) {
         waitForFutureTaskWithTimeout(submittedTask);
       }
-    } catch (OutOfMemoryError ignore) {
+    } catch (RejectedExecutionException | OutOfMemoryError ignore) {
       // If we can't start a thread to close the socket just do it inline.
       // See bug 50573.
       doItInline = true;

Reply via email to