This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8aff924  GEODE-5261 - Add a test for connection-pool prefill during 
shutdown
8aff924 is described below

commit 8aff9249510a7e30bfbcf2ed03f191357b0dbf9a
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Tue May 29 15:18:19 2018 -0700

    GEODE-5261 - Add a test for connection-pool prefill during shutdown
    
    Refactored ConnectionFactoryImpl to allow it to be unit tested.
    Added a unit test to prove that createClientToServerConnection(Set) throws a
    CancelException if the pool is shutting down.
---
 .../cache/client/internal/ConnectionConnector.java | 106 +++++++++++++++++++++
 .../client/internal/ConnectionFactoryImpl.java     |  82 ++++------------
 .../internal/ConnectionFactoryJUnitTest.java       |  78 +++++++++++++++
 3 files changed, 201 insertions(+), 65 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionConnector.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionConnector.java
new file mode 100644
index 0000000..2ef35da
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionConnector.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+import java.io.IOException;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.tier.ClientSideHandshake;
+import org.apache.geode.internal.cache.tier.CommunicationMode;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientUpdater;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+
+public class ConnectionConnector {
+  private final ClientSideHandshakeImpl handshake;
+  private final int socketBufferSize;
+  private final int handshakeTimeout;
+  private final boolean usedByGateway;
+  private final CancelCriterion cancelCriterion;
+  private final SocketCreator socketCreator;
+  private int readTimeout;
+  private InternalDistributedSystem ds;
+  private EndpointManager endpointManager;
+  private GatewaySender gatewaySender;
+
+  public ConnectionConnector(EndpointManager endpointManager, 
InternalDistributedSystem sys,
+      int socketBufferSize, int handshakeTimeout, int readTimeout, 
ClientProxyMembershipID proxyId,
+      CancelCriterion cancelCriterion, boolean usedByGateway, GatewaySender 
sender,
+      boolean multiuserSecureMode) {
+
+    this.handshake =
+        new ClientSideHandshakeImpl(proxyId, sys, sys.getSecurityService(), 
multiuserSecureMode);
+    this.handshake.setClientReadTimeout(readTimeout);
+    this.endpointManager = endpointManager;
+    this.ds = sys;
+    this.socketBufferSize = socketBufferSize;
+    this.handshakeTimeout = handshakeTimeout;
+    this.readTimeout = readTimeout;
+    this.usedByGateway = usedByGateway;
+    this.gatewaySender = sender;
+    this.cancelCriterion = cancelCriterion;
+    if (this.usedByGateway || (this.gatewaySender != null)) {
+      this.socketCreator =
+          
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.GATEWAY);
+      if (sender != null && !sender.getGatewayTransportFilters().isEmpty()) {
+        
this.socketCreator.initializeTransportFilterClientSocketFactory(sender);
+      }
+    } else {
+      // If configured use SSL properties for cache-server
+      this.socketCreator =
+          
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.SERVER);
+    }
+  }
+
+  public ConnectionImpl connectClientToServer(ServerLocation location, boolean 
forQueue)
+      throws IOException {
+    ConnectionImpl connection = new ConnectionImpl(this.ds, 
this.cancelCriterion);
+    ClientSideHandshake connHandShake = new ClientSideHandshakeImpl(handshake);
+    connection.connect(endpointManager, location, connHandShake, 
socketBufferSize, handshakeTimeout,
+        readTimeout, getCommMode(forQueue), this.gatewaySender, 
this.socketCreator);
+    connection.setHandshake(connHandShake);
+    return connection;
+  }
+
+  public CacheClientUpdater connectServerToClient(Endpoint endpoint, 
QueueManager qManager,
+      boolean isPrimary, ClientUpdater failedUpdater, String clientUpdateName) 
{
+    CacheClientUpdater updater = new CacheClientUpdater(clientUpdateName, 
endpoint.getLocation(),
+        isPrimary, ds, new ClientSideHandshakeImpl(this.handshake), qManager, 
endpointManager,
+        endpoint, handshakeTimeout, this.socketCreator);
+
+    if (!updater.isConnected()) {
+      return null;
+    }
+
+    updater.setFailedUpdater(failedUpdater);
+    updater.start();
+    return updater;
+  }
+
+  private CommunicationMode getCommMode(boolean forQueue) {
+    if (this.usedByGateway || (this.gatewaySender != null)) {
+      return CommunicationMode.GatewayToGateway;
+    } else if (forQueue) {
+      return CommunicationMode.ClientToServerForQueue;
+    } else {
+      return CommunicationMode.ClientToServer;
+    }
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
index eba0b0a..51857d3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionFactoryImpl.java
@@ -29,16 +29,11 @@ import 
org.apache.geode.cache.client.internal.ServerBlackList.FailureTracker;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.internal.cache.tier.ClientSideHandshake;
-import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientUpdater;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.geode.internal.net.SocketCreator;
-import org.apache.geode.internal.net.SocketCreatorFactory;
-import org.apache.geode.internal.security.SecurableCommunicationChannel;
 import org.apache.geode.security.GemFireSecurityException;
 
 /**
@@ -54,19 +49,11 @@ public class ConnectionFactoryImpl implements 
ConnectionFactory {
   // TODO - GEODE-1746, the handshake holds state. It seems like the code 
depends
   // on all of the handshake operations happening in a single thread. I don't 
think we
   // want that, need to refactor.
-  private final ClientSideHandshakeImpl handshake;
-  private final int socketBufferSize;
-  private final int handshakeTimeout;
-  private final boolean usedByGateway;
   private final ServerBlackList blackList;
-  private final CancelCriterion cancelCriterion;
-  private final SocketCreator socketCreator;
   private ConnectionSource source;
-  private int readTimeout;
-  private InternalDistributedSystem ds;
-  private EndpointManager endpointManager;
-  private GatewaySender gatewaySender;
   private PoolImpl pool;
+  private final CancelCriterion cancelCriterion;
+  private final ConnectionConnector connectionConnector;
 
   /**
    * Test hook for client version support
@@ -80,67 +67,40 @@ public class ConnectionFactoryImpl implements 
ConnectionFactory {
       InternalDistributedSystem sys, int socketBufferSize, int 
handshakeTimeout, int readTimeout,
       ClientProxyMembershipID proxyId, CancelCriterion cancelCriterion, 
boolean usedByGateway,
       GatewaySender sender, long pingInterval, boolean multiuserSecureMode, 
PoolImpl pool) {
-    this.handshake =
-        new ClientSideHandshakeImpl(proxyId, sys, sys.getSecurityService(), 
multiuserSecureMode);
-    this.handshake.setClientReadTimeout(readTimeout);
+    this(
+        new ConnectionConnector(endpointManager, sys, socketBufferSize, 
handshakeTimeout,
+            readTimeout, proxyId, cancelCriterion, usedByGateway, sender, 
multiuserSecureMode),
+        source, pingInterval, pool, cancelCriterion);
+  }
+
+  public ConnectionFactoryImpl(ConnectionConnector connectionConnector, 
ConnectionSource source,
+      long pingInterval, PoolImpl pool, CancelCriterion cancelCriterion) {
     this.source = source;
-    this.endpointManager = endpointManager;
-    this.ds = sys;
-    this.socketBufferSize = socketBufferSize;
-    this.handshakeTimeout = handshakeTimeout;
-    this.readTimeout = readTimeout;
-    this.usedByGateway = usedByGateway;
-    this.gatewaySender = sender;
     this.blackList = new ServerBlackList(pingInterval);
-    this.cancelCriterion = cancelCriterion;
     this.pool = pool;
-    if (this.usedByGateway || (this.gatewaySender != null)) {
-      this.socketCreator =
-          
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.GATEWAY);
-      if (sender != null && !sender.getGatewayTransportFilters().isEmpty()) {
-        
this.socketCreator.initializeTransportFilterClientSocketFactory(sender);
-      }
-    } else {
-      // If configured use SSL properties for cache-server
-      this.socketCreator =
-          
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.SERVER);
-    }
+    this.cancelCriterion = cancelCriterion;
+    this.connectionConnector = connectionConnector;
   }
 
   public void start(ScheduledExecutorService background) {
     blackList.start(background);
   }
 
-  private CommunicationMode getCommMode(boolean forQueue) {
-    if (this.usedByGateway || (this.gatewaySender != null)) {
-      return CommunicationMode.GatewayToGateway;
-    } else if (forQueue) {
-      return CommunicationMode.ClientToServerForQueue;
-    } else {
-      return CommunicationMode.ClientToServer;
-    }
-  }
-
   public ServerBlackList getBlackList() {
     return blackList;
   }
 
   public Connection createClientToServerConnection(ServerLocation location, 
boolean forQueue)
       throws GemFireSecurityException {
-    ConnectionImpl connection = new ConnectionImpl(this.ds, 
this.cancelCriterion);
     FailureTracker failureTracker = blackList.getFailureTracker(location);
 
     boolean initialized = false;
-
+    Connection connection = null;
     try {
-      ClientSideHandshake connHandShake = new 
ClientSideHandshakeImpl(handshake);
-      connection.connect(endpointManager, location, connHandShake, 
socketBufferSize,
-          handshakeTimeout, readTimeout, getCommMode(forQueue), 
this.gatewaySender,
-          this.socketCreator);
+      connection = connectionConnector.connectClientToServer(location, 
forQueue);
+      initialized = true;
       failureTracker.reset();
-      connection.setHandshake(connHandShake);
       authenticateIfRequired(connection);
-      initialized = true;
     } catch (GemFireConfigException e) {
       throw e;
     } catch (CancelException e) {
@@ -291,16 +251,8 @@ public class ConnectionFactoryImpl implements 
ConnectionFactory {
       logger.debug("Establishing: {}", clientUpdateName);
     }
     // Launch the thread
-    CacheClientUpdater updater = new CacheClientUpdater(clientUpdateName, 
endpoint.getLocation(),
-        isPrimary, ds, new ClientSideHandshakeImpl(this.handshake), qManager, 
endpointManager,
-        endpoint, handshakeTimeout, this.socketCreator);
-
-    if (!updater.isConnected()) {
-      return null;
-    }
-
-    updater.setFailedUpdater(failedUpdater);
-    updater.start();
+    CacheClientUpdater updater = 
connectionConnector.connectServerToClient(endpoint, qManager,
+        isPrimary, failedUpdater, clientUpdateName);
 
     return updater;
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ConnectionFactoryJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ConnectionFactoryJUnitTest.java
new file mode 100644
index 0000000..1e8ce09
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ConnectionFactoryJUnitTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.CancelException;
+import org.apache.geode.distributed.PoolCancelledException;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ConnectionFactoryJUnitTest {
+
+  @Before
+  public void setUp() throws Exception {}
+
+  @After
+  public void tearDown() throws Exception {
+    SocketCreatorFactory.close();
+  }
+
+  @Test(expected = CancelException.class)
+  public void connectionFactoryThrowsCancelException() throws CancelException, 
IOException {
+    ServerLocation serverLocation = mock(ServerLocation.class);
+    doReturn(false).when(serverLocation).getRequiresCredentials();
+
+    ConnectionConnector connector = mock(ConnectionConnector.class);
+    
doReturn(mock(ConnectionImpl.class)).when(connector).connectClientToServer(serverLocation,
+        false);
+
+    ConnectionSource connectionSource = mock(ConnectionSource.class);
+    doReturn(serverLocation).when(connectionSource).findServer(any(Set.class));
+
+    // mocks don't seem to work well with CancelCriterion so let's create a 
real one
+    CancelCriterion cancelCriterion = new CancelCriterion() {
+      @Override
+      public String cancelInProgress() {
+        return "shutting down for test";
+      }
+
+      @Override
+      public RuntimeException generateCancelledException(Throwable throwable) {
+        return new PoolCancelledException(cancelInProgress(), throwable);
+      }
+    };
+
+    ConnectionFactoryImpl connectionFactory = new 
ConnectionFactoryImpl(connector, connectionSource,
+        60000, mock(PoolImpl.class), cancelCriterion);
+
+    connectionFactory.createClientToServerConnection(Collections.emptySet());
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to