Repository: incubator-geode Updated Branches: refs/heads/develop f7670e168 -> 507f2f3a9
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java index 41e5837..a2801c1 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/TCPConduit.java @@ -45,7 +45,6 @@ import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.CancelCriterion; import com.gemstone.gemfire.CancelException; import com.gemstone.gemfire.SystemFailure; -import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.distributed.internal.DMStats; @@ -64,7 +63,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; import com.gemstone.gemfire.internal.logging.log4j.LogMarker; /** <p>TCPConduit manages a server socket and a collection of connections to - other systems. Connections are identified by DistributedMember IDs. + other systems. Connections are identified by host/port Stubs. These types of messages are currently supported:</p><pre> DistributionMessage - message is delivered to the server's @@ -176,8 +175,9 @@ public class TCPConduit implements Runnable { ////////////////// runtime state that is re-initialized on a restart - /** server socket address */ - private InetSocketAddress id; + /** id is an endpoint Stub representing this server. It holds the + actual port the server is listening on */ + private Stub id; protected volatile boolean stopped; @@ -351,7 +351,7 @@ public class TCPConduit implements Runnable { try { localPort = socket.getLocalPort(); - id = new InetSocketAddress(socket.getInetAddress(), localPort); + id = new Stub(socket.getInetAddress(), localPort, 0); stopped = false; ThreadGroup group = LoggingThreadGroup.createThreadGroup("P2P Listener Threads", logger); @@ -371,12 +371,23 @@ public class TCPConduit implements Runnable { } } catch (IOException io) { - String s = "While creating ServerSocket on port " + p; + String s = "While creating ServerSocket and Stub on port " + p; throw new ConnectionException(s, io); } this.port = localPort; } + /** + * After startup we install the view ID into the conduit stub to avoid + * confusion during overlapping shutdown/startup from the same member. + * + * @param viewID + */ + public void setVmViewID(int viewID) { + this.id.setViewID(viewID); + } + + /** creates the server sockets. This can be used to recreate the * socket using this.port and this.bindAddress, which must be set * before invoking this method. @@ -585,7 +596,7 @@ public class TCPConduit implements Runnable { public void run() { ConnectionTable.threadWantsSharedResources(); if (logger.isTraceEnabled(LogMarker.DM)) { - logger.trace(LogMarker.DM, "Starting P2P Listener on {}", id); + logger.trace(LogMarker.DM, "Starting P2P Listener on {}", this.getId()); } for(;;) { SystemFailure.checkFailure(); @@ -720,7 +731,7 @@ public class TCPConduit implements Runnable { } // for if (logger.isTraceEnabled(LogMarker.DM)) { - logger.debug("Stopped P2P Listener on {}", id); + logger.debug("Stopped P2P Listener on {}", this.getId()); } } @@ -796,7 +807,7 @@ public class TCPConduit implements Runnable { * @since 5.1 */ public void getThreadOwnedOrderedConnectionState( - DistributedMember member, + Stub member, Map result) { getConTable().getThreadOwnedOrderedConnectionState(member, result); @@ -808,7 +819,7 @@ public class TCPConduit implements Runnable { * with the key * @since 5.1 */ - public void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map channelState) + public void waitForThreadOwnedOrderedConnectionState(Stub member, Map channelState) throws InterruptedException { // if (Thread.interrupted()) throw new InterruptedException(); not necessary done in waitForThreadOwnedOrderedConnectionState @@ -831,12 +842,13 @@ public class TCPConduit implements Runnable { msg.setBytesRead(bytesRead); msg.setSender(receiver.getRemoteAddress()); msg.setSharedReceiver(receiver.isSharedResource()); - directChannel.receive(msg, bytesRead); + directChannel.receive(msg, bytesRead, receiver.getRemoteId()); } } - /** gets the address of this conduit's ServerSocket endpoint */ - public InetSocketAddress getId() { + /** gets the Stub representing this conduit's ServerSocket endpoint. This + is used to generate other stubs containing endpoint information. */ + public Stub getId() { return id; } @@ -858,16 +870,21 @@ public class TCPConduit implements Runnable { } - /** gets the channel that is used to process non-DistributedMember messages */ + /** gets the channel that is used to process non-Stub messages */ public DirectChannel getDirectChannel() { return directChannel; } + public InternalDistributedMember getMemberForStub(Stub s, boolean validate) { + return membershipManager.getMemberForStub(s, validate); + } + public void setLocalAddr(InternalDistributedMember addr) { localAddr = addr; + this.id.setViewID(addr.getVmViewId()); } - public InternalDistributedMember getLocalAddr() { + public InternalDistributedMember getLocalId() { return localAddr; } @@ -877,6 +894,7 @@ public class TCPConduit implements Runnable { * member is in the membership view and the system is not shutting down. * * @param memberAddress the IDS associated with the remoteId + * @param remoteId the TCPConduit stub for this member * @param preserveOrder whether this is an ordered or unordered connection * @param retry false if this is the first attempt * @param startTime the time this operation started @@ -884,8 +902,8 @@ public class TCPConduit implements Runnable { * @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted (or zero) * @return the connection */ - public Connection getConnection(InternalDistributedMember memberAddress, final boolean preserveOrder, boolean retry, long startTime, - long ackTimeout, long ackSATimeout) + public Connection getConnection(InternalDistributedMember memberAddress, Stub remoteId, final boolean preserveOrder, boolean retry, + long startTime, long ackTimeout, long ackSATimeout) throws java.io.IOException, DistributedSystemDisconnectedException { //final boolean preserveOrder = (processorType == DistributionManager.SERIAL_EXECUTOR )|| (processorType == DistributionManager.PARTITIONED_REGION_EXECUTOR); @@ -904,7 +922,11 @@ public class TCPConduit implements Runnable { // problems. Tear down the connection so that it gets // rebuilt. if (retry || conn != null) { // not first time in loop - if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress) || membershipManager.shutdownInProgress()) { + // Consult with the membership manager; if member has gone away, + // there will not be an entry for this stub. + InternalDistributedMember m = this.membershipManager.getMemberForStub(remoteId, true); + if (m == null) { + // OK, the member left. Just register an error. throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString()); } // bug35953: Member is still in view; we MUST NOT give up! @@ -919,14 +941,15 @@ public class TCPConduit implements Runnable { } // try again after sleep - if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) { + m = this.membershipManager.getMemberForStub(remoteId, true); + if (m == null) { // OK, the member left. Just register an error. throw new IOException(LocalizedStrings.TCPConduit_TCPIP_CONNECTION_LOST_AND_MEMBER_IS_NOT_IN_VIEW.toLocalizedString()); } // Print a warning (once) if (memberInTrouble == null) { - memberInTrouble = memberAddress; + memberInTrouble = m; logger.warn(LocalizedMessage.create(LocalizedStrings.TCPConduit_ATTEMPTING_TCPIP_RECONNECT_TO__0, memberInTrouble)); } else { @@ -940,8 +963,8 @@ public class TCPConduit implements Runnable { if (conn != null) { try { if (logger.isDebugEnabled()) { - logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}", - conn, memberInTrouble); + logger.debug("Closing old connection. conn={} before retrying. remoteID={} memberInTrouble={}", + conn, remoteId, memberInTrouble); } conn.closeForReconnect("closing before retrying"); } @@ -962,10 +985,10 @@ public class TCPConduit implements Runnable { boolean debugRetry = false; do { retryForOldConnection = false; - conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout, ackSATimeout); + conn = getConTable().get(remoteId, preserveOrder, startTime, ackTimeout, ackSATimeout); if (conn == null) { // conduit may be closed - otherwise an ioexception would be thrown - problem = new IOException(LocalizedStrings.TCPConduit_UNABLE_TO_RECONNECT_TO_SERVER_POSSIBLE_SHUTDOWN_0.toLocalizedString(memberAddress)); + problem = new IOException(LocalizedStrings.TCPConduit_UNABLE_TO_RECONNECT_TO_SERVER_POSSIBLE_SHUTDOWN_0.toLocalizedString(remoteId)); } else if (conn.isClosing() || !conn.getRemoteAddress().equals(memberAddress)) { if (logger.isDebugEnabled()) { logger.debug("Got an old connection for {}: {}@{}", memberAddress, conn, conn.hashCode()); @@ -1004,14 +1027,15 @@ public class TCPConduit implements Runnable { if (problem != null) { // Some problems are not recoverable; check and error out early. - if (!membershipManager.memberExists(memberAddress) || membershipManager.isShunned(memberAddress)) { // left the view + InternalDistributedMember m = this.membershipManager.getMemberForStub(remoteId, true); + if (m == null) { // left the view // Bracket our original warning if (memberInTrouble != null) { // make this msg info to bracket warning logger.info(LocalizedMessage.create( LocalizedStrings.TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED, memberInTrouble)); } - throw new IOException(LocalizedStrings.TCPConduit_PEER_HAS_DISAPPEARED_FROM_VIEW.toLocalizedString(memberAddress)); + throw new IOException(LocalizedStrings.TCPConduit_PEER_HAS_DISAPPEARED_FROM_VIEW.toLocalizedString(remoteId)); } // left the view if (membershipManager.shutdownInProgress()) { // shutdown in progress @@ -1030,12 +1054,12 @@ public class TCPConduit implements Runnable { if (memberInTrouble == null) { logger.warn(LocalizedMessage.create( LocalizedStrings.TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1, - new Object[] {memberAddress, problem})); - memberInTrouble = memberAddress; + new Object[] {m, problem})); + memberInTrouble = m; } else { if (logger.isDebugEnabled()) { - logger.debug("Error sending message to {}", memberAddress, problem); + logger.debug("Error sending message to {}", m, problem); } } @@ -1049,7 +1073,7 @@ public class TCPConduit implements Runnable { throw (IOException)problem; } else { - IOException ioe = new IOException( LocalizedStrings.TCPConduit_PROBLEM_CONNECTING_TO_0.toLocalizedString(memberAddress)); + IOException ioe = new IOException( LocalizedStrings.TCPConduit_PROBLEM_CONNECTING_TO_0.toLocalizedString(remoteId)); ioe.initCause(problem); throw ioe; } @@ -1065,8 +1089,8 @@ public class TCPConduit implements Runnable { LocalizedStrings.TCPConduit_SUCCESSFULLY_RECONNECTED_TO_MEMBER_0, memberInTrouble)); if (logger.isTraceEnabled()) { - logger.trace("new connection is {} memberAddress={}", conn, memberAddress); - } + logger.trace("new connection is {} remoteId={} memberAddress={}", conn, remoteId, memberAddress); + } } return conn; } @@ -1078,6 +1102,180 @@ public class TCPConduit implements Runnable { } // for(;;) } +// /** +// * Send a message. +// * @return the connection used to send the message +// * @throws IOException if peer departed view or shutdown in progress +// */ +// private Connection send(Stub remoteId, ByteBuffer bb, boolean preserveOrder, DistributionMessage msg) +// throws java.io.IOException +// { +// if (stopped) { +// throw new ConduitStoppedException("The conduit is stopped"); +// } + +// if (!QUIET) { +// LogWriterI18n l = getLogger(); +// if (l.finerEnabled()) { +// l.finer(id.toString() + " sending " + bb +// + " to " + remoteId); +// } +// } + +// Connection conn = null; +// InternalDistributedMember memberInTrouble = null; +// for (;;) { +// // If this is the second time through this loop, we had +// // problems. Tear down the connection so that it gets +// // rebuilt. +// if (conn != null) { // not first time in loop +// // Consult with the membership manager; if member has gone away, +// // there will not be an entry for this stub. +// InternalDistributedMember m = membershipManager.getMemberForStub(remoteId); +// if (m == null) { +// // OK, the member left. Just register an error. +// throw new IOException("TCP/IP connection lost and member no longer in view"); +// } +// // bug35953: Member is still in view; we MUST NOT give up! + +// // Pause just a tiny bit... +// try { +// Thread.sleep(5000); +// } +// catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// if (membershipManager.shutdownInProgress()) { // shutdown in progress +// // Bracket our original warning +// if (memberInTrouble != null) { +// logger.info("Ending retry attempt because shutdown has started."); +// } +// throw new IOException("Abandoned because shutdown is in progress"); +// } // shutdown in progress + +// // Strange random interrupt intercepted? +// logger.warning("Thread has been interrupted but no shutdown in progress", e); +// throw new DistributedSystemDisconnectedException(e); +// } + +// // Print a warning (once) +// if (memberInTrouble == null) { +// memberInTrouble = m; +// getLogger().warning("Attempting TCP/IP reconnect to " + memberInTrouble); +// } +// else { +// getLogger().fine("Attempting TCP/IP reconnect to " + memberInTrouble); +// } + +// // Close the connection (it will get rebuilt later). +// this.stats.incReconnectAttempts(); +// try { +// conn.closeForReconnect("closing before retrying"); +// } +// catch (CancelException ex) { +// // In general we ignore close problems, but if the system +// // is shutting down, we should just quit. +// throw ex; +// } +// catch (Exception ex) { +// } +// } // not first time in loop + +// // Do the send +// Exception problem = null; +// try { +// // Get (or regenerate) the connection +// // bug36202: this could generate a ConnectionException, so it +// // must be caught and retried +// conn = getConTable().get(remoteId, preserveOrder); +// // getLogger().info ("connections returned " + conn); +// if (conn == null) { +// // conduit may be closed - otherwise an ioexception would be thrown +// throw new IOException("Unable to reconnect to server; possible shutdown: " +// + remoteId); +// } + +// conn.sendPreserialized(bb, msg); +// } +// catch (ConnectionException e) { +// // Race condition between acquiring the connection and attempting +// // to use it: another thread closed it. +// problem = e; +// } +// catch (IOException e) { +// problem = e; +// } + +// if (problem != null) { +// // Some problems are not recoverable; check an error out early. +// InternalDistributedMember m = membershipManager.getMemberForStub(remoteId); +// if (m == null) { // left the view +// // Bracket our original warning +// if (memberInTrouble != null) { +// logger.info("Ending retry attempt because " + memberInTrouble +// + " has disappeared."); +// } +// throw new IOException("Peer has disappeared from view"); +// } // left the view + +// if (membershipManager.shutdownInProgress()) { // shutdown in progress +// // Bracket our original warning +// if (memberInTrouble != null) { +// logger.info("Ending retry attempt because shutdown has started."); +// } +// throw new IOException("Abandoned because shutdown is in progress"); +// } // shutdown in progress + +// if (endpointRemoved(remoteId)) { // endpoint removed +// // TODO what does this mean? +// // Bracket our original warning +// if (memberInTrouble != null) { +// logger.info("Ending retry attempt because " + memberInTrouble +// + " has lost its endpoint."); +// } +// throw new IOException("Endpoint was removed"); +// } // endpoint removed + +// // Log the warning. We wait until now, because we want +// // to have m defined for a nice message... +// if (memberInTrouble == null) { +// logger.warning( +// "Error sending message to " + m + " (will reattempt): " +// + problem.toString(), +// logger.finerEnabled() ? problem : null); +// memberInTrouble = m; +// } +// else { +// logger.fine("Error sending message to " + m, problem); +// } + +// // Retry the operation (indefinitely) +// continue; +// } // problem != null +// // Success! + +// // Make sure our logging is bracketed if there was a problem +// if (memberInTrouble != null) { +// logger.info("Successfully reestablished connection to server " +// + memberInTrouble); +// } +// return conn; +// } // while retry +// } + +// /** +// * Sends an already serialized message in a byte buffer +// * to the given endpoint. Waits for the send to complete +// * before returning. +// * @return the connection used to send the message +// */ +// public Connection sendSync(Stub remoteId, ByteBuffer bb, int processorType, DistributionMessage msg) +// throws java.io.IOException +// { +// return send(remoteId, bb, +// processorType == DistributionManager.SERIAL_EXECUTOR, +// msg); +// } + @Override public String toString() { return "" + id; @@ -1103,22 +1301,22 @@ public class TCPConduit implements Runnable { return directChannel.getDM(); } /** - * Closes any connections used to communicate with the given member + * Closes any connections used to communicate with the given stub */ - public void removeEndpoint(DistributedMember mbr, String reason) { - removeEndpoint(mbr, reason, true); + public void removeEndpoint(Stub stub, String reason) { + removeEndpoint(stub, reason, true); } - public void removeEndpoint(DistributedMember mbr, String reason, boolean notifyDisconnect) { + public void removeEndpoint(Stub stub, String reason, boolean notifyDisconnect) { ConnectionTable ct = this.conTable; if (ct == null) { return; } - ct.removeEndpoint(mbr, reason, notifyDisconnect); + ct.removeEndpoint(stub, reason, notifyDisconnect); } /** check to see if there are still any receiver threads for the given end-point */ - public boolean hasReceiversFor(DistributedMember endPoint) { + public boolean hasReceiversFor(Stub endPoint) { ConnectionTable ct = this.conTable; return (ct != null) && ct.hasReceiversFor(endPoint); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java index 773ef38..1f411bb 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java @@ -38,6 +38,7 @@ import com.gemstone.gemfire.cache.RegionEvent; import com.gemstone.gemfire.cache.RegionFactory; import com.gemstone.gemfire.cache.Scope; import com.gemstone.gemfire.cache.util.CacheListenerAdapter; +import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystem; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.distributed.internal.membership.MembershipManager; @@ -45,10 +46,13 @@ import com.gemstone.gemfire.distributed.internal.membership.NetView; import com.gemstone.gemfire.distributed.internal.membership.gms.MembershipManagerHelper; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager; import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager; +import com.gemstone.gemfire.internal.AvailablePort; import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.tcp.Stub; import dunit.DistributedTestCase; import dunit.Host; +import dunit.SerializableCallable; import dunit.SerializableRunnable; import dunit.VM; @@ -188,7 +192,7 @@ public class DistributionManagerDUnitTest extends DistributedTestCase { sys.getLogWriter().info("<ExpectedException action=add>attempt to add old member</ExpectedException>"); sys.getLogWriter().info("<ExpectedException action=add>Removing shunned GemFire node</ExpectedException>"); try { - boolean accepted = mgr.addSurpriseMember(mbr); + boolean accepted = mgr.addSurpriseMember(mbr, new Stub()); Assert.assertTrue("member with old ID was not rejected (bug #44566)", !accepted); } finally { sys.getLogWriter().info("<ExpectedException action=remove>attempt to add old member</ExpectedException>"); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java index ddbda0b..44e1b46 100755 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManagerJUnitTest.java @@ -16,18 +16,8 @@ */ package com.gemstone.gemfire.distributed.internal.membership.gms.mgr; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; import java.util.ArrayList; import java.util.Arrays; @@ -46,6 +36,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; +import com.gemstone.gemfire.distributed.internal.AdminMessageType; import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.distributed.internal.DMStats; import com.gemstone.gemfire.distributed.internal.DistributionConfig; @@ -53,6 +44,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; import com.gemstone.gemfire.distributed.internal.DistributionManager; import com.gemstone.gemfire.distributed.internal.DistributionMessage; import com.gemstone.gemfire.distributed.internal.HighPriorityAckedMessage; +import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; import com.gemstone.gemfire.distributed.internal.MembershipListener; import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; @@ -68,11 +60,19 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.SuspectMember; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Authenticator; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.HealthMonitor; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave; +import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger; +import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave; +import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager; import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager.StartupEvent; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.admin.remote.AdminRequest; +import com.gemstone.gemfire.internal.admin.remote.AdminResponse; import com.gemstone.gemfire.internal.admin.remote.AlertListenerMessage; import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig; import com.gemstone.gemfire.internal.tcp.ConnectExceptions; +import com.gemstone.gemfire.internal.tcp.Stub; import com.gemstone.gemfire.test.junit.categories.UnitTest; @Category(UnitTest.class) @@ -295,6 +295,13 @@ public class GMSMembershipManagerJUnitTest { suspectMember = mockMembers[1]; manager.handleOrDeferSuspect(new SuspectMember(mockMembers[0], suspectMember, "testing")); verify(listener).memberSuspect(suspectMember, mockMembers[0], "testing"); + + InternalDistributedMember mbr = manager.getMemberForStub(new Stub(myMemberId.getInetAddress(), 2033, 20), false); + assertTrue(mbr == null); + myMemberId.setDirectChannelPort(2033); + mbr = manager.getMemberForStub(new Stub(myMemberId.getInetAddress(), 2033, 20), false); + assertTrue(mbr != null); + assertEquals(mbr, myMemberId); } /** http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java index ffd5092..78c462f 100755 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/tcp/ConnectionJUnitTest.java @@ -22,7 +22,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.verify; import java.io.InputStream; -import java.net.InetSocketAddress; import java.net.Socket; import org.junit.Test; @@ -62,7 +61,7 @@ public class ConnectionJUnitTest { when(stopper.cancelInProgress()).thenReturn(null); when(conduit.getCancelCriterion()).thenReturn(stopper); - when(conduit.getId()).thenReturn(new InetSocketAddress(SocketCreator.getLocalHost(), 10337)); + when(conduit.getId()).thenReturn(new Stub(SocketCreator.getLocalHost(), 10337, 1)); // NIO can't be mocked because SocketChannel has a final method that // is used by Connection - configureBlocking
