This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch feature/GEODE-3637 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 0187e8cd4fb50711cec77ad0e5459c8cd9a64a7a Author: kohlmu-pivotal <[email protected]> AuthorDate: Mon Nov 6 17:37:21 2017 -0800 GEODE-3637: Reimplement client queue initialization. Adding shutdown logic --- .../internal/cache/tier/sockets/AcceptorImpl.java | 314 ++++++++++++--------- .../cache/tier/sockets/ServerConnection.java | 51 ++-- .../sockets/AcceptorImplClientQueueDUnitTest.java | 263 +++++++++++++++++ .../apache/geode/test/dunit/rules/CacheRule.java | 22 +- 4 files changed, 493 insertions(+), 157 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index 5b289a9..abc23e7 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -103,6 +103,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { private static final Logger logger = LogService.getLogger(); private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit"); + private static final int HANDSHAKER_DEFAULT_POOL_SIZE = 4; protected final CacheServerStats stats; private final int maxConnections; @@ -115,6 +116,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { private final ThreadPoolExecutor hsPool; /** + * A pool used to process client-queue-initializations. + */ + private final ThreadPoolExecutor clientQueueInitPool; + + /** * The port on which this acceptor listens for client connections */ private final int localPort; @@ -534,103 +540,126 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings, this.clientNotifier.getStats()); - { - ThreadPoolExecutor tmp_pool = null; - String gName = "ServerConnection " - // + serverSock.getInetAddress() - + "on port " + this.localPort; - final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger); - - ThreadFactory socketThreadFactory = new ThreadFactory() { - int connNum = -1; - - public Thread newThread(final Runnable command) { - int tnum; - synchronized (this) { - tnum = ++connNum; + pool = initializeServerConnectionThreadPool(); + hsPool = initializeHandshakerThreadPool(); + clientQueueInitPool = initializeClientQueueInitializerThreadPool(); + + isAuthenticationRequired = this.securityService.isClientSecurityRequired(); + + isIntegratedSecurity = this.securityService.isIntegratedSecurity(); + + String postAuthzFactoryName = + this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP); + + isPostAuthzCallbackPresent = + (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false; + } + + private ThreadPoolExecutor initializeHandshakerThreadPool() throws IOException { + String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort; + final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger); + + ThreadFactory socketThreadFactory = new ThreadFactory() { + AtomicInteger connNum = new AtomicInteger(-1); + + @Override + public Thread newThread(Runnable command) { + String threadName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet(); + getStats().incAcceptThreadsCreated(); + return new Thread(socketThreadGroup, command, threadName); + } + }; + try { + final BlockingQueue blockingQueue = new SynchronousQueue(); + final RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { + public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) { + try { + blockingQueue.put(r); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); // preserve the state + throw new RejectedExecutionException( + LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex); } - String tName = socketThreadGroup.getName() + " Thread " + tnum; - getStats().incConnectionThreadsCreated(); - Runnable r = new Runnable() { - public void run() { - try { - command.run(); - } catch (CancelException e) { // bug 39463 - // ignore - } finally { - ConnectionTable.releaseThreadsSockets(); - } - } - }; - return new Thread(socketThreadGroup, r, tName); } }; - try { - if (isSelector()) { - tmp_pool = new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads, - getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE); - } else { - tmp_pool = new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L, - TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory); - } - } catch (IllegalArgumentException poolInitException) { - this.stats.close(); - this.serverSock.close(); - throw poolInitException; - } - this.pool = tmp_pool; + logger.warn("Handshaker max Pool size: " + HANDSHAKE_POOL_SIZE); + return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue, + socketThreadFactory, rejectedExecutionHandler); + } catch (IllegalArgumentException poolInitException) { + this.stats.close(); + this.serverSock.close(); + this.pool.shutdownNow(); + throw poolInitException; } - { - ThreadPoolExecutor tmp_hsPool = null; - String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort; - final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger); + } - ThreadFactory socketThreadFactory = new ThreadFactory() { - int connNum = -1; + private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException { + final ThreadGroup clientQueueThreadGroup = + LoggingThreadGroup.createThreadGroup("Client Queue Initialization ", logger); - public Thread newThread(Runnable command) { - int tnum; - synchronized (this) { - tnum = ++connNum; + ThreadFactory clientQueueThreadFactory = new ThreadFactory() { + AtomicInteger connNum = new AtomicInteger(-1); + + @Override + public Thread newThread(final Runnable command) { + String threadName = + clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet(); + Runnable runnable = new Runnable() { + public void run() { + try { + command.run(); + } catch (CancelException e) { + logger.debug("Client Queue Initialization was canceled.", e); + } } - String tName = socketThreadGroup.getName() + " Thread " + tnum; - getStats().incAcceptThreadsCreated(); - return new Thread(socketThreadGroup, command, tName); - } - }; - try { - final BlockingQueue bq = new SynchronousQueue(); - final RejectedExecutionHandler reh = new RejectedExecutionHandler() { - public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) { + }; + return new Thread(clientQueueThreadGroup, runnable, threadName); + } + }; + return new PooledExecutorWithDMStats(new SynchronousQueue(), 16, getStats().getCnxPoolHelper(), + clientQueueThreadFactory, 60000); + } + + private ThreadPoolExecutor initializeServerConnectionThreadPool() throws IOException { + String gName = "ServerConnection " + // + serverSock.getInetAddress() + + "on port " + this.localPort; + final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger); + + ThreadFactory socketThreadFactory = new ThreadFactory() { + AtomicInteger connNum = new AtomicInteger(-1); + + @Override + public Thread newThread(final Runnable command) { + String tName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet(); + getStats().incConnectionThreadsCreated(); + Runnable r = new Runnable() { + public void run() { try { - bq.put(r); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); // preserve the state - throw new RejectedExecutionException( - LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex); + command.run(); + } catch (CancelException e) { // bug 39463 + // ignore + } finally { + ConnectionTable.releaseThreadsSockets(); } } }; - tmp_hsPool = new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, bq, - socketThreadFactory, reh); - } catch (IllegalArgumentException poolInitException) { - this.stats.close(); - this.serverSock.close(); - this.pool.shutdownNow(); - throw poolInitException; + return new Thread(socketThreadGroup, r, tName); } - this.hsPool = tmp_hsPool; + }; + try { + if (isSelector()) { + return new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads, + getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE); + } else { + return new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L, + TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory); + } + } catch (IllegalArgumentException poolInitException) { + this.stats.close(); + this.serverSock.close(); + throw poolInitException; } - - isAuthenticationRequired = this.securityService.isClientSecurityRequired(); - - isIntegratedSecurity = this.securityService.isIntegratedSecurity(); - - String postAuthzFactoryName = - this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP); - - isPostAuthzCallbackPresent = - (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false; } public long getAcceptorId() { @@ -666,8 +695,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { @Deprecated private static final int DEPRECATED_SELECTOR_POOL_SIZE = Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue(); - private static final int HANDSHAKE_POOL_SIZE = - Integer.getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", 4).intValue(); + private static final int HANDSHAKE_POOL_SIZE = Integer + .getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", HANDSHAKER_DEFAULT_POOL_SIZE).intValue(); @Override public void start() throws IOException { @@ -802,8 +831,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { * @see SystemFailure#loadEmergencyClasses() */ public static void loadEmergencyClasses() { - if (emergencyClassesLoaded) + if (emergencyClassesLoaded) { return; + } emergencyClassesLoaded = true; CachedRegionHelper.loadEmergencyClasses(); ServerConnection.loadEmergencyClasses(); @@ -870,8 +900,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { private Selector tmpSel; private void checkForStuckKeys() { - if (!WORKAROUND_SELECTOR_BUG) + if (!WORKAROUND_SELECTOR_BUG) { return; + } if (tmpSel == null) { try { tmpSel = Selector.open(); @@ -887,8 +918,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { while (it.hasNext()) { SelectionKey sk = (SelectionKey) it.next(); ServerConnection sc = (ServerConnection) sk.attachment(); - if (sc == null) + if (sc == null) { continue; + } try { sk.cancel(); this.selector.selectNow(); // clear the cancelled key @@ -1040,40 +1072,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { break; } if (events == 0) { - // zeroEventsCount++; - // if (zeroEventsCount > 0) { - // zeroEventsCount = 0; checkForStuckKeys(); - - // try { - // this.selector.close(); // this selector is sick! - // } catch (IOException ignore) { - // } - // this.selector = Selector.open(); - // { - // Iterator it = selectorRegistrations.iterator(); - // while (it.hasNext()) { - // ServerConnection sc = (ServerConnection)it.next(); - // sc.registerWithSelector2(this.selector); - // } - // } - // } - // ArrayList al = new ArrayList(); - // Iterator keysIt = this.selector.keys().iterator(); - // while (keysIt.hasNext()) { - // SelectionKey sk = (SelectionKey)keysIt.next(); - // al.add(sk.attachment()); - // sk.cancel(); - // } - // events = this.selector.selectNow(); - // Iterator alIt = al.iterator(); - // while (alIt.hasNext()) { - // ServerConnection sc = (ServerConnection)alIt.next(); - // sc.registerWithSelector2(this.selector); - // } - // events = this.selector.select(); - // } else { - // zeroEventsCount = 0; } while (events > 0) { int cancelCount = 0; @@ -1130,16 +1129,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { logger.warn( LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected)); } - // } else if (key.isValid() && key.isConnectable()) { - // logger.info("DEBUG isConnectable and isValid key=" + key); - // finishCon(sc); } else { finishCon(sc); if (key.isValid()) { logger.warn(LocalizedMessage.create( LocalizedStrings.AcceptorImpl_IGNORING_EVENT_ON_SELECTOR_KEY__0, key)); - // } else { - // logger.info("DEBUG !isValid key=" + key); } } } catch (CancelledKeyException ex) { // fix for bug 37739 @@ -1405,6 +1399,10 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { return this.clientServerCnxCount.get(); } + public boolean isNotifyBySubscription() { + return notifyBySubscription; + } + protected void handleNewClientConnection(final Socket socket, final ServerConnectionFactory serverConnectionFactory) throws IOException { // Read the first byte. If this socket is being used for 'client to server' @@ -1427,12 +1425,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { throw new EOFException(); } - if (communicationMode.isSubscriptionFeed()) { - boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient; - logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}", - primary ? "primary" : "secondary", socket); - AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId, - this.notifyBySubscription); + // GEODE-3637 - If the communicationMode is client Subscriptions, hand-off the client queue + // initialization to be done in another threadPool + if (initializeClientPools(socket, communicationMode)) { return; } @@ -1498,6 +1493,17 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { } } + private boolean initializeClientPools(Socket socket, CommunicationMode communicationMode) { + if (communicationMode.isSubscriptionFeed()) { + boolean isPrimaryServerToClient = + communicationMode == CommunicationMode.PrimaryServerToClient; + clientQueueInitPool + .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this)); + return true; + } + return false; + } + private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException { socket.setSoTimeout(this.acceptTimeout); this.socketCreator.configureServerSSLSocket(socket); @@ -1637,6 +1643,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { Thread.currentThread().interrupt(); this.pool.shutdownNow(); } + this.clientQueueInitPool.shutdownNow(); this.hsPool.shutdownNow(); } @@ -1654,6 +1661,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { return !isRunning() && !thread.isAlive() && (selectorThread == null || !selectorThread.isAlive()) && (pool == null || pool.isShutdown()) && (hsPool == null || hsPool.isShutdown()) + && (clientQueueInitPool == null || clientQueueInitPool.isShutdown()) && (selector == null || !selector.isOpen()) && (tmpSel == null || !tmpSel.isOpen()); } @@ -1662,7 +1670,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { * then calculate it. * @return the ip address or host name this acceptor will listen on. An "" if all local addresses * will be listened to. - * * @since GemFire 5.7 */ private static String calcBindHostName(Cache cache, String bindName) { @@ -1791,7 +1798,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { * This method returns a thread safe structure which can be iterated over without worrying about * ConcurrentModificationException. JMX MBeans/Commands need to iterate over this list to get * client info. - * */ public ServerConnection[] getAllServerConnectionList() { return this.allSCList; @@ -1815,4 +1821,42 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { releaseCommBuffer(Message.setTLCommBuffer(null)); } + + private class ClientQueueInitializerTask implements Runnable { + private final Socket socket; + private final boolean isPrimaryServerToClient; + private final AcceptorImpl acceptor; + + public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient, + AcceptorImpl acceptor) { + this.socket = socket; + this.acceptor = acceptor; + this.isPrimaryServerToClient = isPrimaryServerToClient; + } + + @Override + public void run() { + logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}", + isPrimaryServerToClient ? "primary" : "secondary", socket); + try { + acceptor.getCacheClientNotifier().registerClient(socket, isPrimaryServerToClient, + acceptor.getAcceptorId(), acceptor.isNotifyBySubscription()); + } catch (IOException ex) { + closeSocket(socket); + if (isRunning()) { + if (!acceptor.loggedAcceptError) { + acceptor.loggedAcceptError = true; + if (ex instanceof SocketTimeoutException) { + logger.warn(LocalizedMessage.create( + LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION_DUE_TO_SOCKET_TIMEOUT)); + } else { + logger.warn(LocalizedMessage.create( + LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION__0, + ex), ex); + } + } + } + } + } + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index 0e510af..74451e5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -189,7 +189,9 @@ public abstract class ServerConnection implements Runnable { */ private volatile int requestSpecificTimeout = -1; - /** Tracks the id of the most recent batch to which a reply has been sent */ + /** + * Tracks the id of the most recent batch to which a reply has been sent + */ private int latestBatchIdReplied = -1; /* @@ -711,8 +713,9 @@ public abstract class ServerConnection implements Runnable { // can be used. initializeCommands(); // its initialized in verifyClientConnection call - if (!getCommunicationMode().isWAN()) + if (!getCommunicationMode().isWAN()) { initializeClientUserAuths(); + } } if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) { Assert.assertTrue((this.handshake.getVersion().ordinal() == testVersionAfterHandshake), @@ -892,7 +895,9 @@ public abstract class ServerConnection implements Runnable { } } if (unregisterClient)// last serverconnection call all close on auth objects + { cleanClientAuths(); + } this.clientUserAuths = null; if (needsUnregister) { this.acceptor.getClientHealthMonitor().removeConnection(this.proxyId, this); @@ -917,8 +922,9 @@ public abstract class ServerConnection implements Runnable { ClientUserAuths cua = new ClientUserAuths(proxyId.hashCode()); ClientUserAuths retCua = proxyIdVsClientUserAuths.putIfAbsent(proxyId, cua); - if (retCua == null) + if (retCua == null) { return cua; + } return retCua; } @@ -954,8 +960,9 @@ public abstract class ServerConnection implements Runnable { boolean removed = this.clientUserAuths.removeSubject(aIds.getUniqueId()); // if not successfull, try the old way - if (!removed) + if (!removed) { removed = this.clientUserAuths.removeUserId(aIds.getUniqueId(), keepalive); + } return removed; } catch (NullPointerException npe) { @@ -1010,7 +1017,6 @@ public abstract class ServerConnection implements Runnable { throw new AuthenticationFailedException("Authentication failed"); } - byte[] credBytes = msg.getPart(0).getSerializedForm(); credBytes = ((HandShake) this.handshake).decryptBytes(credBytes); @@ -1124,6 +1130,7 @@ public abstract class ServerConnection implements Runnable { public void run() { setOwner(); + if (getAcceptor().isSelector()) { boolean finishedMsg = false; try { @@ -1136,9 +1143,7 @@ public abstract class ServerConnection implements Runnable { finishedMsg = true; } } - } catch (java.nio.channels.ClosedChannelException ignore) { - // ok shutting down - } catch (CancelException e) { + } catch (java.nio.channels.ClosedChannelException | CancelException ignore) { // ok shutting down } catch (IOException ex) { logger.warn( @@ -1187,6 +1192,7 @@ public abstract class ServerConnection implements Runnable { * If registered with a selector then this will be the key we are registered with. */ // private SelectionKey sKey = null; + /** * Register this connection with the given selector for read events. Note that switch the channel * to non-blocking so it can be in a selector. @@ -1202,7 +1208,8 @@ public abstract class ServerConnection implements Runnable { } public void registerWithSelector2(Selector s) throws IOException { - /* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this); + /* this.sKey = */ + getSelectableChannel().register(s, SelectionKey.OP_READ, this); } /** @@ -1225,7 +1232,6 @@ public abstract class ServerConnection implements Runnable { } /** - * * @return String representing the DistributedSystemMembership of the Client VM */ public String getMembershipID() { @@ -1265,10 +1271,11 @@ public abstract class ServerConnection implements Runnable { } protected int getClientReadTimeout() { - if (this.requestSpecificTimeout == -1) + if (this.requestSpecificTimeout == -1) { return this.handshake.getClientReadTimeout(); - else + } else { return this.requestSpecificTimeout; + } } protected boolean isProcessingMessage() { @@ -1519,7 +1526,9 @@ public abstract class ServerConnection implements Runnable { return this.name; } - /** returns the name of this connection */ + /** + * returns the name of this connection + */ public String getName() { return this.name; } @@ -1736,11 +1745,13 @@ public abstract class ServerConnection implements Runnable { // for backward client it will be store in member variable userAuthId // for other look "requestMsg" here and get unique-id from this to get the authzrequest - if (!AcceptorImpl.isAuthenticationRequired()) + if (!AcceptorImpl.isAuthenticationRequired()) { return null; + } - if (AcceptorImpl.isIntegratedSecurity()) + if (AcceptorImpl.isIntegratedSecurity()) { return null; + } long uniqueId = getUniqueId(); @@ -1768,11 +1779,13 @@ public abstract class ServerConnection implements Runnable { public AuthorizeRequestPP getPostAuthzRequest() throws AuthenticationRequiredException, IOException { - if (!AcceptorImpl.isAuthenticationRequired()) + if (!AcceptorImpl.isAuthenticationRequired()) { return null; + } - if (AcceptorImpl.isIntegratedSecurity()) + if (AcceptorImpl.isIntegratedSecurity()) { return null; + } // look client version and return authzrequest // for backward client it will be store in member variable userAuthId @@ -1799,7 +1812,9 @@ public abstract class ServerConnection implements Runnable { return postAuthReq; } - /** returns the member ID byte array to be used for creating EventID objects */ + /** + * returns the member ID byte array to be used for creating EventID objects + */ public byte[] getEventMemberIDByteArray() { return this.memberIdByteArray; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java new file mode 100644 index 0000000..c0b2d07 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.tier.sockets; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.rmi.RemoteException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.EntryEvent; +import org.apache.geode.cache.InterestPolicy; +import org.apache.geode.cache.InterestResultPolicy; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.SubscriptionAttributes; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.cache.server.ClientSubscriptionConfig; +import org.apache.geode.cache.util.CacheListenerAdapter; +import org.apache.geode.distributed.DistributedLockBlackboard; +import org.apache.geode.distributed.DistributedLockBlackboardImpl; +import org.apache.geode.internal.cache.DiskStoreAttributes; +import org.apache.geode.internal.cache.InitialImageOperation; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; +import org.apache.geode.test.dunit.rules.DistributedTestRule; +import org.apache.geode.test.dunit.rules.SharedCountersRule; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; +import org.apache.geode.test.junit.rules.serializable.SerializableTestName; + +@Category(DistributedTest.class) +public class AcceptorImplClientQueueDUnitTest implements Serializable { + private final Host host = Host.getHost(0); + private static final int numberOfEntries = 200; + private static final AtomicInteger eventCount = new AtomicInteger(0); + private static final AtomicBoolean completedClient2 = new AtomicBoolean(false); + + @ClassRule + public static DistributedTestRule distributedTestRule = new DistributedTestRule(); + + @Rule + public CacheRule cacheRule = + CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(host.getVM(1)) + .addSystemProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1").build(); + + @Rule + public SerializableTestName name = new SerializableTestName(); + + @Rule + public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder(); + + @Rule + public DistributedRestoreSystemProperties restoreSystemProperties = + new DistributedRestoreSystemProperties(); + + private DistributedLockBlackboard blackboard = null; + + @Before + public void setup() throws Exception { + blackboard = DistributedLockBlackboardImpl.getInstance(); + } + + @After + public void tearDown() throws RemoteException { + blackboard.initCount(); + host.getAllVMs().forEach((vm) -> vm.invoke(() -> { + InitialImageOperation.slowImageProcessing = 0; + System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE"); + })); + } + + @Test + public void testClientSubscriptionQueueBlockingConnectionInitialization() throws Exception { + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + VM vm3 = host.getVM(3); + int vm0_port = vm0.invoke("Start server with subscription turned on", () -> { + try { + return createSubscriptionServer(cacheRule.getCache()); + } catch (IOException e) { + return 0; + } + }); + + vm2.invoke("Start Client1 with durable interest registration turned on", () -> { + ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); + clientCacheFactory.setPoolSubscriptionEnabled(true); + clientCacheFactory.setPoolSubscriptionRedundancy(1); + clientCacheFactory.setPoolReadTimeout(200); + clientCacheFactory.addPoolServer(host.getHostName(), vm0_port); + ClientCache cache = clientCacheFactory.set("durable-client-id", "1") + .set("durable-client-timeout", "300").set("mcast-port", "0").create(); + ClientRegionFactory<Object, Object> clientRegionFactory = + cache.createClientRegionFactory(ClientRegionShortcut.PROXY); + Region region = clientRegionFactory.create("subscriptionRegion"); + + region.registerInterestRegex(".*", InterestResultPolicy.NONE, true); + cache.readyForEvents(); + cache.close(true); + }); + vm3.invoke("Start Client2 to add entries to region", () -> { + ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); + clientCacheFactory.addPoolServer(host.getHostName(), vm0_port); + ClientCache cache = clientCacheFactory.set("mcast-port", "0").create(); + ClientRegionFactory<Object, Object> clientRegionFactory = + cache.createClientRegionFactory(ClientRegionShortcut.PROXY); + Region region = clientRegionFactory.create("subscriptionRegion"); + + for (int i = 0; i < numberOfEntries; i++) { + region.put(i, i); + } + cache.close(); + }); + + int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on", () -> { + try { + int serverPort = createSubscriptionServer(cacheRule.getCache()); + InitialImageOperation.slowImageProcessing = 30; + return serverPort; + } catch (IOException e) { + return 0; + } + }); + + vm0.invoke("Turn on slow image processsing", () -> { + InitialImageOperation.slowImageProcessing = 30; + }); + + AsyncInvocation<Boolean> completedClient1 = + vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> { + + ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); + clientCacheFactory.setPoolSubscriptionEnabled(true); + clientCacheFactory.setPoolSubscriptionRedundancy(1); + clientCacheFactory.setPoolMinConnections(1); + clientCacheFactory.setPoolMaxConnections(1); + clientCacheFactory.setPoolReadTimeout(200); + clientCacheFactory.addPoolServer(host.getHostName(), vm1_port); + ClientCacheFactory cacheFactory = clientCacheFactory.set("durable-client-id", "1") + .set("durable-client-timeout", "300").set("mcast-port", "0"); + blackboard.incCount(); + ClientCache cache = cacheFactory.create(); + + ClientRegionFactory<Object, Object> clientRegionFactory = + cache.createClientRegionFactory(ClientRegionShortcut.PROXY); + Region region = clientRegionFactory.addCacheListener(new CacheListenerAdapter() { + @Override + public void afterCreate(EntryEvent event) { + eventCount.incrementAndGet(); + } + + @Override + public void afterUpdate(EntryEvent event) { + eventCount.incrementAndGet(); + } + }).create("subscriptionRegion"); + + region.registerInterestRegex(".*", InterestResultPolicy.NONE, true); + cache.readyForEvents(); + Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS) + .until(() -> eventCount.get() == numberOfEntries); + cache.close(); + return eventCount.get() == numberOfEntries; + }); + + vm3.invokeAsync("Start Client2 to add entries to region", () -> { + while (true) { + Thread.sleep(100); + if (blackboard.getCount() == 1) { + break; + } + } + ClientCache cache = null; + ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); + clientCacheFactory.setPoolRetryAttempts(0); + clientCacheFactory.setPoolMinConnections(1); + clientCacheFactory.setPoolMaxConnections(1); + clientCacheFactory.setPoolReadTimeout(200); + clientCacheFactory.setPoolSocketConnectTimeout(500); + clientCacheFactory.addPoolServer(host.getHostName(), vm1_port); + cache = clientCacheFactory.set("mcast-port", "0").create(); + ClientRegionFactory<Object, Object> clientRegionFactory = + cache.createClientRegionFactory(ClientRegionShortcut.PROXY); + Region region = clientRegionFactory.create("subscriptionRegion"); + + int returnValue = 0; + for (int i = 0; i < 100; i++) { + returnValue = (int) region.get(i); + } + cache.close(); + completedClient2.set(returnValue == 99); + }); + assertTrue(completedClient1.get()); + assertTrue(vm3.invoke(() -> completedClient2.get())); + } + + private int createSubscriptionServer(InternalCache cache) throws IOException { + initializeDiskStore(cache); + initializeReplicateRegion(cache); + return initializeCacheServerWithSubscription(host, cache); + } + + private void initializeDiskStore(InternalCache cache) throws IOException { + DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes(); + diskStoreAttributes.name = "clientQueueDS"; + diskStoreAttributes.diskDirs = new File[] {tempDir.newFolder(name + "_dir")}; + cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS"); + } + + private void initializeReplicateRegion(InternalCache cache) { + cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true) + .setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL)) + .create("subscriptionRegion"); + } + + private int initializeCacheServerWithSubscription(Host host, InternalCache cache) + throws IOException { + CacheServer cacheServer1 = cache.addCacheServer(false); + ClientSubscriptionConfig clientSubscriptionConfig = cacheServer1.getClientSubscriptionConfig(); + clientSubscriptionConfig.setEvictionPolicy("entry"); + clientSubscriptionConfig.setCapacity(5); + clientSubscriptionConfig.setDiskStoreName("clientQueueDS"); + cacheServer1.setPort(0); + cacheServer1.setHostnameForClients(host.getHostName()); + cacheServer1.start(); + return cacheServer1.getPort(); + } +} diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java index dc42da8..b65bf86 100644 --- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java +++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java @@ -63,6 +63,7 @@ public class CacheRule extends DistributedExternalResource { private final boolean disconnectAfter; private final List<VM> createCacheInVMs; private final Properties config; + private final Properties systemProperties; public static Builder builder() { return new Builder(); @@ -74,18 +75,19 @@ public class CacheRule extends DistributedExternalResource { this.disconnectAfter = builder.disconnectAfter; this.createCacheInVMs = builder.createCacheInVMs; this.config = builder.config; + this.systemProperties = builder.systemProperties; } @Override protected void before() { if (createCacheInAll) { - invoker().invokeInEveryVMAndController(() -> createCache(config)); + invoker().invokeInEveryVMAndController(() -> createCache(config, systemProperties)); } else { if (createCache) { - createCache(config); + createCache(config, systemProperties); } for (VM vm : createCacheInVMs) { - vm.invoke(() -> createCache(config)); + vm.invoke(() -> createCache(config, systemProperties)); } } } @@ -108,7 +110,8 @@ public class CacheRule extends DistributedExternalResource { return cache.getInternalDistributedSystem(); } - private static void createCache(final Properties config) { + private static void createCache(final Properties config, final Properties systemProperties) { + System.getProperties().putAll(systemProperties); cache = (InternalCache) new CacheFactory(config).create(); } @@ -141,6 +144,7 @@ public class CacheRule extends DistributedExternalResource { private boolean disconnectAfter; private List<VM> createCacheInVMs = new ArrayList<>(); private Properties config = new Properties(); + private Properties systemProperties = new Properties(); public Builder() { config.setProperty(LOCATORS, getLocators()); @@ -195,6 +199,16 @@ public class CacheRule extends DistributedExternalResource { return this; } + public Builder addSystemProperty(final String key, final String value) { + this.systemProperties.put(key, value); + return this; + } + + public Builder addSystemProperties(final Properties config) { + this.systemProperties.putAll(config); + return this; + } + public CacheRule build() { return new CacheRule(this); } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
