This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch feature/GEODE-3637 in repository https://gitbox.apache.org/repos/asf/geode.git
commit ee610d65da43c55ca7e43988addc74171f82c3e4 Author: kohlmu-pivotal <[email protected]> AuthorDate: Wed Oct 25 12:37:50 2017 -0700 GEODE-3637: Moved client queue initialization into the ServerConnection.java --- .../internal/cache/tier/sockets/AcceptorImpl.java | 21 +++--- .../cache/tier/sockets/ServerConnection.java | 77 +++++++++++++++------- 2 files changed, 65 insertions(+), 33 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index 59ef466..ad910bd 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -1405,6 +1405,10 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { return this.clientServerCnxCount.get(); } + public boolean isNotifyBySubscription() { + return notifyBySubscription; + } + protected void handleNewClientConnection(final Socket socket, final ServerConnectionFactory serverConnectionFactory) throws IOException { // Read the first byte. If this socket is being used for 'client to server' @@ -1427,14 +1431,15 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool { throw new EOFException(); } - if (communicationMode.isSubscriptionFeed()) { - boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient; - logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}", - primary ? "primary" : "secondary", socket); - AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId, - this.notifyBySubscription); - return; - } + // GEODE-3637 + // if (communicationMode.isSubscriptionFeed()) { + // boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient; + // logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}", + // primary ? "primary" : "secondary", socket); + // AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId, + // this.notifyBySubscription); + // return; + // } logger.debug("Bridge server: Initializing {} communication socket: {}", communicationMode, socket); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java index 7fc688c..5c69769 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java @@ -72,7 +72,7 @@ import org.apache.geode.security.GemFireSecurityException; * Provides an implementation for the server socket end of the hierarchical cache connection. Each * server connection runs in its own thread to maximize concurrency and improve response times to * edge requests - * + * * @since GemFire 2.0.2 */ public abstract class ServerConnection implements Runnable { @@ -189,16 +189,18 @@ public abstract class ServerConnection implements Runnable { */ private volatile int requestSpecificTimeout = -1; - /** Tracks the id of the most recent batch to which a reply has been sent */ + /** + * Tracks the id of the most recent batch to which a reply has been sent + */ private int latestBatchIdReplied = -1; /* * Uniquely identifying the client's Distributed System * - * + * * private String membershipId; - * - * + * + * * Uniquely identifying the client's ConnectionProxy object * * @@ -711,8 +713,9 @@ public abstract class ServerConnection implements Runnable { // can be used. initializeCommands(); // its initialized in verifyClientConnection call - if (!getCommunicationMode().isWAN()) + if (!getCommunicationMode().isWAN()) { initializeClientUserAuths(); + } } if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) { Assert.assertTrue((this.handshake.getVersion().ordinal() == testVersionAfterHandshake), @@ -892,7 +895,9 @@ public abstract class ServerConnection implements Runnable { } } if (unregisterClient)// last serverconnection call all close on auth objects + { cleanClientAuths(); + } this.clientUserAuths = null; if (needsUnregister) { this.acceptor.getClientHealthMonitor().removeConnection(this.proxyId, this); @@ -917,8 +922,9 @@ public abstract class ServerConnection implements Runnable { ClientUserAuths cua = new ClientUserAuths(proxyId.hashCode()); ClientUserAuths retCua = proxyIdVsClientUserAuths.putIfAbsent(proxyId, cua); - if (retCua == null) + if (retCua == null) { return cua; + } return retCua; } @@ -954,8 +960,9 @@ public abstract class ServerConnection implements Runnable { boolean removed = this.clientUserAuths.removeSubject(aIds.getUniqueId()); // if not successfull, try the old way - if (!removed) + if (!removed) { removed = this.clientUserAuths.removeUserId(aIds.getUniqueId(), keepalive); + } return removed; } catch (NullPointerException npe) { @@ -984,7 +991,7 @@ public abstract class ServerConnection implements Runnable { /* * This means that client and server VMs have different security settings. The server does * not have any security settings specified while client has. - * + * * Here, should we just ignore this and send the dummy security part (connectionId, userId) * in the response (in this case, client needs to know that it is not expected to read any * security part in any of the server response messages) or just throw an exception @@ -1010,7 +1017,6 @@ public abstract class ServerConnection implements Runnable { throw new AuthenticationFailedException("Authentication failed"); } - byte[] credBytes = msg.getPart(0).getSerializedForm(); credBytes = ((HandShake) this.handshake).decryptBytes(credBytes); @@ -1066,7 +1072,7 @@ public abstract class ServerConnection implements Runnable { /** * MessageType of the messages (typically internal commands) which do not need to participate in * security should be added in the following if block. - * + * * @return Part * @see AbstractOp#processSecureBytes(Connection, Message) * @see AbstractOp#needsUserId() @@ -1124,9 +1130,12 @@ public abstract class ServerConnection implements Runnable { public void run() { setOwner(); + if (getAcceptor().isSelector()) { boolean finishedMsg = false; try { + initializeClientNofication(); + this.stats.decThreadQueueSize(); if (!isTerminated()) { getAcceptor().setTLCommBuffer(); @@ -1136,9 +1145,7 @@ public abstract class ServerConnection implements Runnable { finishedMsg = true; } } - } catch (java.nio.channels.ClosedChannelException ignore) { - // ok shutting down - } catch (CancelException e) { + } catch (java.nio.channels.ClosedChannelException | CancelException ignore) { // ok shutting down } catch (IOException ex) { logger.warn( @@ -1183,10 +1190,21 @@ public abstract class ServerConnection implements Runnable { } } + private void initializeClientNofication() throws IOException { + if (communicationMode.isSubscriptionFeed()) { + boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient; + logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}", + primary ? "primary" : "secondary", theSocket); + getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary, + getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription()); + } + } + /** * If registered with a selector then this will be the key we are registered with. */ // private SelectionKey sKey = null; + /** * Register this connection with the given selector for read events. Note that switch the channel * to non-blocking so it can be in a selector. @@ -1202,7 +1220,8 @@ public abstract class ServerConnection implements Runnable { } public void registerWithSelector2(Selector s) throws IOException { - /* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this); + /* this.sKey = */ + getSelectableChannel().register(s, SelectionKey.OP_READ, this); } /** @@ -1225,7 +1244,6 @@ public abstract class ServerConnection implements Runnable { } /** - * * @return String representing the DistributedSystemMembership of the Client VM */ public String getMembershipID() { @@ -1265,10 +1283,11 @@ public abstract class ServerConnection implements Runnable { } protected int getClientReadTimeout() { - if (this.requestSpecificTimeout == -1) + if (this.requestSpecificTimeout == -1) { return this.handshake.getClientReadTimeout(); - else + } else { return this.requestSpecificTimeout; + } } protected boolean isProcessingMessage() { @@ -1492,7 +1511,7 @@ public abstract class ServerConnection implements Runnable { /** * Just ensure that this class gets loaded. - * + * * @see SystemFailure#loadEmergencyClasses() */ public static void loadEmergencyClasses() { @@ -1519,7 +1538,9 @@ public abstract class ServerConnection implements Runnable { return this.name; } - /** returns the name of this connection */ + /** + * returns the name of this connection + */ public String getName() { return this.name; } @@ -1736,11 +1757,13 @@ public abstract class ServerConnection implements Runnable { // for backward client it will be store in member variable userAuthId // for other look "requestMsg" here and get unique-id from this to get the authzrequest - if (!AcceptorImpl.isAuthenticationRequired()) + if (!AcceptorImpl.isAuthenticationRequired()) { return null; + } - if (AcceptorImpl.isIntegratedSecurity()) + if (AcceptorImpl.isIntegratedSecurity()) { return null; + } long uniqueId = getUniqueId(); @@ -1768,11 +1791,13 @@ public abstract class ServerConnection implements Runnable { public AuthorizeRequestPP getPostAuthzRequest() throws AuthenticationRequiredException, IOException { - if (!AcceptorImpl.isAuthenticationRequired()) + if (!AcceptorImpl.isAuthenticationRequired()) { return null; + } - if (AcceptorImpl.isIntegratedSecurity()) + if (AcceptorImpl.isIntegratedSecurity()) { return null; + } // look client version and return authzrequest // for backward client it will be store in member variable userAuthId @@ -1799,7 +1824,9 @@ public abstract class ServerConnection implements Runnable { return postAuthReq; } - /** returns the member ID byte array to be used for creating EventID objects */ + /** + * returns the member ID byte array to be used for creating EventID objects + */ public byte[] getEventMemberIDByteArray() { return this.memberIdByteArray; } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
