Revert "Removing TCPConduit's Stub ID class" This reverts commit 5b35e43f93bfbf6d62eadf7979eb3a8b7f59b77e.
This commit was causing compilation failures. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/507f2f3a Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/507f2f3a Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/507f2f3a Branch: refs/heads/develop Commit: 507f2f3a905e70fcabed9b83d4dc966ef3e9e6ec Parents: f7670e1 Author: Dan Smith <[email protected]> Authored: Fri Dec 11 16:49:38 2015 -0800 Committer: Dan Smith <[email protected]> Committed: Fri Dec 11 17:25:39 2015 -0800 ---------------------------------------------------------------------- .../internal/DistributionManager.java | 8 + .../distributed/internal/StartupMessage.java | 1 + .../internal/direct/DirectChannel.java | 93 ++++++- .../internal/direct/MissingStubException.java | 37 +++ .../internal/direct/ShunnedMemberException.java | 34 --- .../internal/membership/MembershipManager.java | 29 +- .../gms/mgr/GMSMembershipManager.java | 197 +++++++++++-- .../internal/i18n/ParentLocalizedStrings.java | 6 +- .../gemfire/internal/tcp/Connection.java | 117 ++++---- .../gemfire/internal/tcp/ConnectionTable.java | 91 +++--- .../internal/tcp/MemberShunnedException.java | 7 +- .../gemfire/internal/tcp/ServerDelegate.java | 5 +- .../com/gemstone/gemfire/internal/tcp/Stub.java | 164 +++++++++++ .../gemfire/internal/tcp/TCPConduit.java | 274 ++++++++++++++++--- .../internal/DistributionManagerDUnitTest.java | 6 +- .../gms/mgr/GMSMembershipManagerJUnitTest.java | 31 ++- .../internal/tcp/ConnectionJUnitTest.java | 3 +- 17 files changed, 870 insertions(+), 233 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java index e3c342a..964845c 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java @@ -91,6 +91,7 @@ import com.gemstone.gemfire.internal.sequencelog.MembershipLogger; import com.gemstone.gemfire.internal.tcp.Connection; import com.gemstone.gemfire.internal.tcp.ConnectionTable; import com.gemstone.gemfire.internal.tcp.ReenteredConnectException; +import com.gemstone.gemfire.internal.tcp.Stub; import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantLock; /** @@ -2714,6 +2715,13 @@ public class DistributionManager return false; // no peers, we are alone. } + // ensure we have stubs for everyone else + Iterator it = allOthers.iterator(); + while (it.hasNext()) { + InternalDistributedMember member = (InternalDistributedMember)it.next(); + membershipManager.getStubForMember(member); + } + try { ok = op.sendStartupMessage(allOthers, STARTUP_TIMEOUT, equivs, redundancyZone, enforceUniqueZone()); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java index 01f8c62..96f8b60 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java @@ -37,6 +37,7 @@ import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributes import com.gemstone.gemfire.internal.InternalInstantiator.InstantiatorAttributesHolder; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.tcp.Stub; /** * A message that is sent to all other distribution manager when http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java index d4df3bf..14ff923 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java @@ -38,7 +38,6 @@ import com.gemstone.gemfire.InternalGemFireException; import com.gemstone.gemfire.SystemFailure; import com.gemstone.gemfire.ToDataException; import com.gemstone.gemfire.cache.TimeoutException; -import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.distributed.internal.DMStats; @@ -65,6 +64,7 @@ import com.gemstone.gemfire.internal.tcp.Connection; import com.gemstone.gemfire.internal.tcp.ConnectionException; import com.gemstone.gemfire.internal.tcp.MemberShunnedException; import com.gemstone.gemfire.internal.tcp.MsgStreamer; +import com.gemstone.gemfire.internal.tcp.Stub; import com.gemstone.gemfire.internal.tcp.TCPConduit; import com.gemstone.gemfire.internal.util.Breadcrumbs; import com.gemstone.gemfire.internal.util.concurrent.ReentrantSemaphore; @@ -115,6 +115,13 @@ public class DirectChannel { } /** + * Returns the endpoint ID for the direct channel + */ + public Stub getLocalStub() { + return conduit.getId(); + } + + /** * when the initial number of members is known, this method is invoked * to ensure that connections to those members can be established in a * reasonable amount of time. See bug 39848 @@ -174,7 +181,7 @@ public class DirectChannel { this.groupOrderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS); this.groupUnorderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS); logger.info(LocalizedMessage.create( - LocalizedStrings.DirectChannel_GEMFIRE_P2P_LISTENER_STARTED_ON__0, conduit.getLocalAddr())); + LocalizedStrings.DirectChannel_GEMFIRE_P2P_LISTENER_STARTED_ON__0, conduit.getId())); } catch (ConnectionException ce) { @@ -185,6 +192,48 @@ public class DirectChannel { } +// /** +// * +// * @param addr destination for the message +// * @param stubMap map containing all the stubs +// * @param msg the original message +// * @param msgBuf the serialized message +// * @param directAck true if we need an ack +// * @param processorType the type (serialized, etc.) +// * @return if directAck, the Connection that needs the acknowledgment +// * @throws MissingStubException if we do not have a Stub for the recipient +// * @throws IOException if the message could not be sent +// */ +// private Connection attemptSingleSend(MembershipManager mgr, +// InternalDistributedMember addr, +// DistributionMessage msg, ByteBuffer msgBuf, +// boolean directAck, int processorType) +// throws MissingStubException, IOException +// { +// if (!msg.deliverToSender() && localAddr.equals(addr)) +// return null; + +// if (addr == null) +// return null; +// Stub dest = mgr.getStubForMember(addr); +// if (dest == null) { +// // This should only happen if the member is no longer in the view. +// Assert.assertTrue(!mgr.memberExists(addr)); +// throw new MissingStubException("No stub"); +// } +// try { +// msgBuf.position(0); // fix for bug#30680 +// Connection con = conduit.sendSync(dest, msgBuf, processorType, msg); +// if (directAck) +// return con; +// else +// return null; +// } +// catch(IOException t) { +// throw t; +// } +// } + /** * Return how many concurrent operations should be allowed by default. * since 6.6, this has been raised to Integer.MAX value from the number @@ -590,13 +639,22 @@ public class DirectChannel { continue; } - if (!mgr.memberExists(destination) || mgr.shutdownInProgress() || mgr.isShunned(destination)) { + Stub stub = mgr.getStubForMember(destination); + if (stub == null) { // This should only happen if the member is no longer in the view. if (logger.isTraceEnabled(LogMarker.DM)) { - logger.trace(LogMarker.DM, "Not a member: {}", destination); + logger.trace(LogMarker.DM, "No Stub for {}", destination); } + // The only time getStubForMember returns null is if we are + // shunning that member or we are shutting down. + // So the following assertion is wrong: + //Assert.assertTrue(!mgr.memberExists(destination)); + // instead we should: + // Assert.assertTrue(mgr.shutdownInProgress() || mgr.isShunned(destination)); + //but this is not worth doing and isShunned is not public. + // SO the assert has been deadcoded. if (ce == null) ce = new ConnectExceptions(); - ce.addFailure(destination, new ShunnedMemberException(LocalizedStrings.DirectChannel_SHUNNING_0.toLocalizedString())); + ce.addFailure(destination, new MissingStubException(LocalizedStrings.DirectChannel_NO_STUB_0.toLocalizedString())); } else { try { @@ -604,8 +662,8 @@ public class DirectChannel { if (ackTimeout > 0) { startTime = System.currentTimeMillis(); } - Connection con = conduit.getConnection(destination, preserveOrder, - retry, startTime, ackTimeout, ackSDTimeout); + Connection con = conduit.getConnection(destination, stub, + preserveOrder, retry, startTime, ackTimeout, ackSDTimeout); con.setInUse(true, startTime, 0, 0, null); // fix for bug#37657 cons.add(con); @@ -765,7 +823,7 @@ public class DirectChannel { } - public void receive(DistributionMessage msg, int bytesRead) { + public void receive(DistributionMessage msg, int bytesRead, Stub connId) { if (disconnected) { return; } @@ -786,6 +844,10 @@ public class DirectChannel { } } +// public void newMemberConnected(InternalDistributedMember member, Stub id) { +// receiver.newMemberConnected(member, id); +// } + public InternalDistributedMember getLocalAddress() { return this.localAddr; } @@ -868,6 +930,13 @@ public class DirectChannel { } } + /** Create a TCPConduit stub from a JGroups InternalDistributedMember */ + public Stub createConduitStub(InternalDistributedMember addr) { + int port = addr.getDirectChannelPort(); + Stub stub = new Stub(addr.getInetAddress(), port, addr.getVmViewId()); + return stub; + } + public void closeEndpoint(InternalDistributedMember member, String reason) { closeEndpoint(member, reason, true); } @@ -879,7 +948,7 @@ public class DirectChannel { public void closeEndpoint(InternalDistributedMember member, String reason, boolean notifyDisconnect) { TCPConduit tc = this.conduit; if (tc != null) { - tc.removeEndpoint(member, reason, notifyDisconnect); + tc.removeEndpoint(createConduitStub(member), reason, notifyDisconnect); } } @@ -893,7 +962,7 @@ public class DirectChannel { * the map to add the state to * @since 5.1 */ - public void getChannelStates(DistributedMember member, Map result) + public void getChannelStates(Stub member, Map result) { TCPConduit tc = this.conduit; if (tc != null) { @@ -905,7 +974,7 @@ public class DirectChannel { * wait for the given connections to process the number of messages * associated with the connection in the given map */ - public void waitForChannelState(DistributedMember member, Map channelState) + public void waitForChannelState(Stub member, Map channelState) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); @@ -918,7 +987,7 @@ public class DirectChannel { /** * returns true if there are still receiver threads for the given member */ - public boolean hasReceiversFor(DistributedMember mbr) { + public boolean hasReceiversFor(Stub mbr) { return this.conduit.hasReceiversFor(mbr); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java new file mode 100644 index 0000000..49b4486 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.distributed.internal.direct; + +import com.gemstone.gemfire.GemFireCheckedException; + +/** + * Exception thrown when the TCPConduit is unable to acquire a stub + * for the given recipient. + * + * @author jpenney + * + */ +public class MissingStubException extends GemFireCheckedException +{ + + private static final long serialVersionUID = -6455664684151074915L; + + public MissingStubException(String msg) { + super(msg); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java deleted file mode 100644 index 59db762..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.distributed.internal.direct; - -import com.gemstone.gemfire.GemFireCheckedException; - -/** - * Exception thrown when a member is no longer in the distributed system - * - */ -public class ShunnedMemberException extends GemFireCheckedException -{ - - private static final long serialVersionUID = -6455664684151074915L; - - public ShunnedMemberException(String msg) { - super(msg); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java index 7416efa..a46680b 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java @@ -27,6 +27,7 @@ import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.internal.DMStats; import com.gemstone.gemfire.distributed.internal.DistributionMessage; import com.gemstone.gemfire.internal.logging.InternalLogWriter; +import com.gemstone.gemfire.internal.tcp.Stub; /** * A MembershipManager is responsible for reporting a MemberView, as well as @@ -73,7 +74,7 @@ public interface MembershipManager { * @param m the member * @return true if it still exists */ - public boolean memberExists(DistributedMember m); + public boolean memberExists(InternalDistributedMember m); /** * Is this manager still connected? If it has not been initialized, this @@ -142,6 +143,25 @@ public interface MembershipManager { throws NotSerializableException; /** + * Return a {@link Stub} referring to the given member. A <em>null</em> may + * be returned if the system is not employing stubs for communication. + * + * @param m the member + * @return the stub + */ + public Stub getStubForMember(InternalDistributedMember m); + + /** + * Return a {@link InternalDistributedMember} associated with the given Stub. This + * method may return a null if Stubs are not being used. + * @param s Stub to look up + * @param validated true if member must be in the current view + * @return the member associated with the given stub, if any + */ + public InternalDistributedMember getMemberForStub(Stub s, boolean validated); + + + /** * Indicates to the membership manager that the system is shutting down. * Typically speaking, this means that new connection attempts are to be * ignored and disconnect failures are to be (more) tolerated. @@ -266,7 +286,7 @@ public interface MembershipManager { */ public void warnShun(DistributedMember mbr); - public boolean addSurpriseMember(DistributedMember mbr); + public boolean addSurpriseMember(DistributedMember mbr, Stub stub); /** if a StartupMessage is going to reject a new member, this should be used * to make sure we don't keep that member on as a "surprise member" @@ -287,11 +307,6 @@ public interface MembershipManager { * @return true if the member is a surprise member */ public boolean isSurpriseMember(DistributedMember m); - - /** - * Returns true if the member is being shunned - */ - public boolean isShunned(DistributedMember m); /** * Forces use of UDP for communications in the current thread. UDP is http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index 7be0a3a..0b7a544 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@ -94,6 +94,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LogMarker; import com.gemstone.gemfire.internal.shared.StringPrintWriter; import com.gemstone.gemfire.internal.tcp.ConnectExceptions; import com.gemstone.gemfire.internal.tcp.MemberShunnedException; +import com.gemstone.gemfire.internal.tcp.Stub; import com.gemstone.gemfire.internal.util.Breadcrumbs; public class GMSMembershipManager implements MembershipManager, Manager @@ -155,6 +156,7 @@ public class GMSMembershipManager implements MembershipManager, Manager boolean crashed; String reason; DistributionMessage dmsg; + Stub stub; NetView gmsView; @Override @@ -163,7 +165,7 @@ public class GMSMembershipManager implements MembershipManager, Manager sb.append("kind="); switch (kind) { case SURPRISE_CONNECT: - sb.append("connect; member = <" + member + ">"); + sb.append("connect; member = <" + member + ">; stub = " + stub); break; case VIEW: String text = gmsView.toString(); @@ -182,10 +184,12 @@ public class GMSMembershipManager implements MembershipManager, Manager /** * Create a surprise connect event * @param member the member connecting + * @param id the stub */ - StartupEvent(final InternalDistributedMember member) { + StartupEvent(final InternalDistributedMember member, final Stub id) { this.kind = SURPRISE_CONNECT; this.member = member; + this.stub = id; } /** * Indicate if this is a surprise connect event @@ -278,6 +282,24 @@ public class GMSMembershipManager implements MembershipManager, Manager volatile boolean hasJoined; /** + * a map keyed on InternalDistributedMember, values are Stubs that represent direct + * channels to other systems + * + * Accesses must be under the read or write lock of {@link #latestViewLock}. + */ + protected final Map<InternalDistributedMember, Stub> memberToStubMap = + new ConcurrentHashMap<InternalDistributedMember, Stub>(); + + /** + * a map of direct channels (Stub) to InternalDistributedMember. key instanceof Stub + * value instanceof InternalDistributedMember + * + * Accesses must be under the read or write lock of {@link #latestViewLock}. + */ + protected final Map<Stub, InternalDistributedMember> stubToMemberMap = + new ConcurrentHashMap<Stub, InternalDistributedMember>(); + + /** * Members of the distributed system that we believe have shut down. * Keys are instances of {@link InternalDistributedMember}, values are * Longs indicating the time this member was shunned. @@ -525,6 +547,12 @@ public class GMSMembershipManager implements MembershipManager, Manager } } + // fix for bug #42006, lingering old identity + Object oldStub = this.memberToStubMap.remove(m); + if (oldStub != null) { + this.stubToMemberMap.remove(oldStub); + } + if (shutdownInProgress()) { addShunnedMember(m); continue; // no additions processed after shutdown begins @@ -778,6 +806,9 @@ public class GMSMembershipManager implements MembershipManager, Manager if (directChannel != null) { directChannel.setLocalAddr(address); + Stub stub = directChannel.getLocalStub(); + memberToStubMap.put(address, stub); + stubToMemberMap.put(stub, address); } this.hasJoined = true; @@ -874,15 +905,17 @@ public class GMSMembershipManager implements MembershipManager, Manager /** * Process a surprise connect event, or place it on the startup queue. * @param member the member + * @param stub its stub */ protected void handleOrDeferSurpriseConnect(InternalDistributedMember member) { + Stub stub = new Stub(member.getInetAddress(), member.getDirectChannelPort(), member.getVmViewId()); synchronized (startupLock) { if (!processingEvents) { - startupMessages.add(new StartupEvent(member)); + startupMessages.add(new StartupEvent(member, stub)); return; } } - processSurpriseConnect(member); + processSurpriseConnect(member, stub); } public void startupMessageFailed(DistributedMember mbr, String failureMessage) { @@ -908,9 +941,12 @@ public class GMSMembershipManager implements MembershipManager, Manager * been added, simply returns; else adds the member. * * @param dm the member joining + * @param stub the member's stub */ - public boolean addSurpriseMember(DistributedMember dm) { + public boolean addSurpriseMember(DistributedMember dm, + Stub stub) { final InternalDistributedMember member = (InternalDistributedMember)dm; + Stub s = null; boolean warn = false; latestViewLock.writeLock().lock(); @@ -973,6 +1009,16 @@ public class GMSMembershipManager implements MembershipManager, Manager startCleanupTimer(); } // cleanupTimer == null + // fix for bug #42006, lingering old identity + Object oldStub = this.memberToStubMap.remove(member); + if (oldStub != null) { + this.stubToMemberMap.remove(oldStub); + } + + s = stub == null ? getStubForMember(member) : stub; + // Make sure that channel information is consistent + addChannel(member, s); + // Ensure that the member is accounted for in the view // Conjure up a new view including the new member. This is necessary // because we are about to tell the listener about a new member, so @@ -1108,7 +1154,7 @@ public class GMSMembershipManager implements MembershipManager, Manager // If it's a new sender, wait our turn, generate the event if (isNew) { - shunned = !addSurpriseMember(m); + shunned = !addSurpriseMember(m, getStubForMember(m)); } // isNew } @@ -1120,7 +1166,7 @@ public class GMSMembershipManager implements MembershipManager, Manager if (shunned) { // bug #41538 - shun notification must be outside synchronization to avoid hanging warnShun(m); logger.info("Membership: Ignoring message from shunned member <{}>:{}", m, msg); - throw new MemberShunnedException(m); + throw new MemberShunnedException(getStubForMember(m)); } listener.messageReceived(msg); @@ -1202,11 +1248,13 @@ public class GMSMembershipManager implements MembershipManager, Manager * grabbed a stable view if this is really a new member. * * @param member + * @param stub */ private void processSurpriseConnect( - InternalDistributedMember member) + InternalDistributedMember member, + Stub stub) { - addSurpriseMember(member); + addSurpriseMember(member, stub); } /** @@ -1228,7 +1276,7 @@ public class GMSMembershipManager implements MembershipManager, Manager processView(o.gmsView.getViewId(), o.gmsView); } else if (o.isSurpriseConnect()) { // connect - processSurpriseConnect(o.member); + processSurpriseConnect(o.member, o.stub); } else // sanity @@ -1402,7 +1450,7 @@ public class GMSMembershipManager implements MembershipManager, Manager } } - public boolean memberExists(DistributedMember m) { + public boolean memberExists(InternalDistributedMember m) { latestViewLock.readLock().lock(); NetView v = latestView; latestViewLock.readLock().unlock(); @@ -1477,6 +1525,12 @@ public class GMSMembershipManager implements MembershipManager, Manager directChannel.emergencyClose(); } + // could we guarantee not to allocate objects? We're using Darrel's + // factory, so it's possible that an unsafe implementation could be + // introduced here. +// stubToMemberMap.clear(); +// memberToStubMap.clear(); + if (DEBUG) { System.err.println("DEBUG: done closing GroupMembershipService"); } @@ -1713,7 +1767,7 @@ public class GMSMembershipManager implements MembershipManager, Manager allDestinations = true; latestViewLock.writeLock().lock(); try { - List<InternalDistributedMember> keySet = latestView.getMembers(); + Set keySet = memberToStubMap.keySet(); keys = new InternalDistributedMember[keySet.size()]; keys = (InternalDistributedMember[])keySet.toArray(keys); } finally { @@ -1966,6 +2020,80 @@ public class GMSMembershipManager implements MembershipManager, Manager // not currently supported by this manager } + /** + * Get or create stub for given member + */ + public Stub getStubForMember(InternalDistributedMember m) + { + if (shutdownInProgress) { + throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), services.getShutdownCause()); + } + + if (services.getConfig().getDistributionConfig().getDisableTcp()) { + return new Stub(m.getInetAddress(), m.getPort(), m.getVmViewId()); + } + + // Return existing one if it is already in place + Stub result; + result = (Stub)memberToStubMap.get(m); + if (result != null) + return result; + + latestViewLock.writeLock().lock(); + try { + // Do all of this work in a critical region to prevent + // members from slipping in during shutdown + if (shutdownInProgress()) + return null; // don't try to create a stub during shutdown + if (isShunned(m)) + return null; // don't let zombies come back to life + + // OK, create one. Update the table to reflect the creation. + result = directChannel.createConduitStub(m); + addChannel(m, result); + } finally { + latestViewLock.writeLock().unlock(); + } + return result; + } + + public InternalDistributedMember getMemberForStub(Stub s, boolean validated) + { + latestViewLock.writeLock().lock(); + try { + if (shutdownInProgress) { + throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), services.getShutdownCause()); + } + InternalDistributedMember result = (InternalDistributedMember) + stubToMemberMap.get(s); + if (result != null) { + if (validated && !this.latestView.contains(result)) { + // Do not return this member unless it is in the current view. + if (!surpriseMembers.containsKey(result)) { + // if not a surprise member, this stub is lingering and should be removed + stubToMemberMap.remove(s); + memberToStubMap.remove(result); + } + result = null; + // fall through to see if there is a newer member using the same direct port + } + } + if (result == null) { + // it may have not been added to the stub->idm map yet, so check the current view + for (InternalDistributedMember idm: latestView.getMembers()) { + if (GMSUtil.compareAddresses(idm.getInetAddress(), s.getInetAddress()) == 0 + && idm.getDirectChannelPort() == s.getPort()) { + addChannel(idm, s); + return idm; + } + } + } + return result; + } finally { + latestViewLock.writeLock().unlock(); + } + } + public void setShutdown() { latestViewLock.writeLock().lock(); @@ -1981,6 +2109,24 @@ public class GMSMembershipManager implements MembershipManager, Manager return shutdownInProgress || (dm != null && dm.shutdownInProgress()); } + /** + * Add a mapping from the given member to the given stub. Must + * be called with {@link #latestViewLock} held. + * + * @param member + * @param theChannel + */ + protected void addChannel(InternalDistributedMember member, Stub theChannel) + { + if (theChannel != null) { + // Don't overwrite existing stub information with a null + this.memberToStubMap.put(member, theChannel); + + // Can't create reverse mapping if the stub is null + this.stubToMemberMap.put(theChannel, member); + } + } + /** * Clean up and create consistent new view with member removed. @@ -1991,6 +2137,12 @@ public class GMSMembershipManager implements MembershipManager, Manager protected void destroyMember(final InternalDistributedMember member, boolean crashed, final String reason) { + // Clean up the maps + Stub theChannel = (Stub)memberToStubMap.remove(member); + if (theChannel != null) { + this.stubToMemberMap.remove(theChannel); + } + // Make sure it is removed from the view latestViewLock.writeLock().lock(); try { @@ -2213,11 +2365,12 @@ public class GMSMembershipManager implements MembershipManager, Manager /* non-thread-owned serial channels and high priority channels are not * included */ - public Map getChannelStates(DistributedMember member, boolean includeMulticast) { + public Map getMessageState(DistributedMember member, boolean includeMulticast) { Map result = new HashMap(); + Stub stub = (Stub)memberToStubMap.get(member); DirectChannel dc = directChannel; - if (dc != null) { - dc.getChannelStates(member, result); + if (stub != null && dc != null) { + dc.getChannelStates(stub, result); } services.getMessenger().getMessageState((InternalDistributedMember)member, result, includeMulticast); return result; @@ -2228,8 +2381,15 @@ public class GMSMembershipManager implements MembershipManager, Manager { if (Thread.interrupted()) throw new InterruptedException(); DirectChannel dc = directChannel; - if (dc != null) { - dc.waitForChannelState(otherMember, channelState); + Stub stub; + latestViewLock.writeLock().lock(); + try { + stub = (Stub)memberToStubMap.get(otherMember); + } finally { + latestViewLock.writeLock().unlock(); + } + if (dc != null && stub != null) { + dc.waitForChannelState(stub, state); } services.getMessenger().waitForMessageState((InternalDistributedMember)otherMember, state); } @@ -2245,6 +2405,7 @@ public class GMSMembershipManager implements MembershipManager, Manager boolean result = false; DirectChannel dc = directChannel; InternalDistributedMember idm = (InternalDistributedMember)mbr; + Stub stub = new Stub(idm.getInetAddress(), idm.getPort(), idm.getVmViewId()); int memberTimeout = this.services.getConfig().getDistributionConfig().getMemberTimeout(); long pauseTime = (memberTimeout < 1000) ? 100 : memberTimeout / 10; boolean wait; @@ -2252,7 +2413,7 @@ public class GMSMembershipManager implements MembershipManager, Manager do { wait = false; if (dc != null) { - if (dc.hasReceiversFor(idm)) { + if (dc.hasReceiversFor(stub)) { wait = true; } if (wait && logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java index 780fe18..7bb97b9 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java @@ -1109,7 +1109,7 @@ class ParentLocalizedStrings { public static final StringId TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED = new StringId(2086, "Ending reconnect attempt because {0} has disappeared."); public static final StringId TCPConduit_ENDING_RECONNECT_ATTEMPT_TO_0_BECAUSE_SHUTDOWN_HAS_STARTED = new StringId(2087, "Ending reconnect attempt to {0} because shutdown has started."); public static final StringId TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1 = new StringId(2088, "Error sending message to {0} (will reattempt): {1}"); - public static final StringId TCPConduit_EXCEPTION_CREATING_SERVERSOCKET = new StringId(2089, "While creating ServerSocket on port {0} with address {1}"); + public static final StringId TCPConduit_EXCEPTION_CREATING_SERVERSOCKET = new StringId(2089, "While creating ServerSocket and Stub on port {0} with address {1}"); public static final StringId TCPConduit_EXCEPTION_PARSING_P2PIDLECONNECTIONTIMEOUT = new StringId(2090, "exception parsing p2p.idleConnectionTimeout"); public static final StringId TCPConduit_EXCEPTION_PARSING_P2PTCPBUFFERSIZE = new StringId(2091, "exception parsing p2p.tcpBufferSize"); public static final StringId TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1 = new StringId(2092, "Failed to accept connection from {0} because {1}"); @@ -1444,7 +1444,7 @@ class ParentLocalizedStrings { public static final StringId Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1 = new StringId(2432, "Detected wrong version of GemFire product during handshake. Expected {0} but found {1}"); public static final StringId Connection_FORCED_DISCONNECT_SENT_TO_0 = new StringId(2433, "Forced disconnect sent to {0}"); public static final StringId Connection_HANDSHAKE_FAILED = new StringId(2434, "Handshake failed"); - public static final StringId Connection_MEMBER_LEFT_THE_GROUP = new StringId(2435, "Member {0} left the group"); + public static final StringId Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP = new StringId(2435, "Member for stub {0} left the group"); public static final StringId Connection_NOT_CONNECTED_TO_0 = new StringId(2436, "Not connected to {0}"); public static final StringId Connection_NULL_CONNECTIONTABLE = new StringId(2437, "Null ConnectionTable"); public static final StringId Connection_SOCKET_HAS_BEEN_CLOSED = new StringId(2438, "socket has been closed"); @@ -1542,7 +1542,7 @@ class ParentLocalizedStrings { public static final StringId DefaultQuery_WHEN_QUERYING_A_PARTITIONED_REGION_THE_PROJECTIONS_MUST_NOT_REFERENCE_ANY_REGIONS = new StringId(2530, "When querying a Partitioned Region, the projections must not reference any regions"); public static final StringId DestroyMessage_FAILED_SENDING_0 = new StringId(2531, "Failed sending < {0} >"); public static final StringId DirectChannel_COMMUNICATIONS_DISCONNECTED = new StringId(2532, "communications disconnected"); - public static final StringId DirectChannel_SHUNNING_0 = new StringId(2533, "Member is being shunned: {0}"); + public static final StringId DirectChannel_NO_STUB_0 = new StringId(2533, "No stub {0}"); public static final StringId DirectChannel_UNKNOWN_ERROR_SERIALIZING_MESSAGE = new StringId(2534, "Unknown error serializing message"); public static final StringId DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING = new StringId(2535, "An IOException was thrown while serializing."); public static final StringId DiskEntry_DISK_REGION_IS_NULL = new StringId(2536, "Disk region is null"); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java index 74660da..f918812 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java @@ -50,7 +50,6 @@ import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.CancelException; import com.gemstone.gemfire.SystemFailure; import com.gemstone.gemfire.cache.CacheClosedException; -import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.internal.ConflationKey; import com.gemstone.gemfire.distributed.internal.DM; @@ -73,6 +72,7 @@ import com.gemstone.gemfire.internal.Assert; import com.gemstone.gemfire.internal.ByteArrayDataInput; import com.gemstone.gemfire.internal.DSFIDFactory; import com.gemstone.gemfire.internal.InternalDataSerializer; +import com.gemstone.gemfire.internal.SocketCloser; import com.gemstone.gemfire.internal.SocketCreator; import com.gemstone.gemfire.internal.SocketUtils; import com.gemstone.gemfire.internal.SystemTimer; @@ -222,6 +222,11 @@ public class Connection implements Runnable { /** the ID string of the conduit (for logging) */ String conduitIdStr; + /** remoteId identifies the remote conduit's listener. It does NOT + identify the "port" that this connection's socket is attached + to, which is a different thing altogether */ + Stub remoteId; + /** Identifies the java group member on the other side of the connection. */ InternalDistributedMember remoteAddr; @@ -796,7 +801,7 @@ public class Connection implements Runnable { } if (success) { if (this.isReceiver) { - needToClose = !owner.getConduit().getMembershipManager().addSurpriseMember(this.remoteAddr); + needToClose = !owner.getConduit().getMembershipManager().addSurpriseMember(this.remoteAddr, this.remoteId); if (needToClose) { reason = "this member is shunned"; } @@ -840,7 +845,7 @@ public class Connection implements Runnable { * @param beingSick */ private void asyncClose(boolean beingSick) { - // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake + // note: remoteId may be null if this is a receiver that hasn't finished its handshake // we do the close in a background thread because the operation may hang if // there is a problem with the network. See bug #46659 @@ -1013,7 +1018,8 @@ public class Connection implements Runnable { protected static Connection createSender(final MembershipManager mgr, final ConnectionTable t, final boolean preserveOrder, - final DistributedMember remoteAddr, + final Stub key, + final InternalDistributedMember remoteAddr, final boolean sharedResource, final long startTime, final long ackTimeout, @@ -1068,8 +1074,9 @@ public class Connection implements Runnable { } if (firstTime) { firstTime = false; - if (!mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress()) { - throw new IOException("Member " + remoteAddr + " left the system"); + InternalDistributedMember m = mgr.getMemberForStub(key, true); + if (m == null) { + throw new IOException("Member for stub " + key + " left the group"); } } else { @@ -1077,7 +1084,7 @@ public class Connection implements Runnable { // alert listener should not prevent cache operations from continuing if (AlertAppender.isThreadAlerting()) { // do not change the text of this exception - it is looked for in exception handlers - throw new IOException("Cannot form connection to alert listener " + remoteAddr); + throw new IOException("Cannot form connection to alert listener " + key); } // Wait briefly... @@ -1090,19 +1097,20 @@ public class Connection implements Runnable { t.getConduit().getCancelCriterion().checkCancelInProgress(ie); } t.getConduit().getCancelCriterion().checkCancelInProgress(null); - if (giveUpOnMember(mgr, remoteAddr)) { - throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr)); + InternalDistributedMember m = mgr.getMemberForStub(key, true); + if (m == null) { + throw new IOException(LocalizedStrings.Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP.toLocalizedString(key)); } if (!warningPrinted) { warningPrinted = true; - logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, remoteAddr)); + logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, m)); } t.getConduit().stats.incReconnectAttempts(); } //create connection try { conn = null; - conn = new Connection(mgr, t, preserveOrder, remoteAddr, sharedResource); + conn = new Connection(mgr, t, preserveOrder, key, remoteAddr, sharedResource); } catch (javax.net.ssl.SSLHandshakeException se) { // no need to retry if certificates were rejected @@ -1110,7 +1118,8 @@ public class Connection implements Runnable { } catch (IOException ioe) { // Only give up if the member leaves the view. - if (giveUpOnMember(mgr, remoteAddr)) { + InternalDistributedMember m = mgr.getMemberForStub(key, true); + if (m == null) { throw ioe; } t.getConduit().getCancelCriterion().checkCancelInProgress(null); @@ -1121,7 +1130,7 @@ public class Connection implements Runnable { connectionErrorLogged = true; // otherwise change to use 100ms intervals causes a lot of these logger.info(LocalizedMessage.create( LocalizedStrings.Connection_CONNECTION_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1, - new Object[] {sharedResource, preserveOrder, remoteAddr, ioe})); + new Object[] {sharedResource, preserveOrder, m, ioe})); } } // IOException finally { @@ -1137,8 +1146,9 @@ public class Connection implements Runnable { // something went wrong while reading the handshake // and the socket was closed or this guy sent us a // ShutdownMessage - if (giveUpOnMember(mgr, remoteAddr)) { - throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr)); + InternalDistributedMember m = mgr.getMemberForStub(key, true); + if (m == null) { + throw new IOException(LocalizedStrings.Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP.toLocalizedString(key)); } t.getConduit().getCancelCriterion().checkCancelInProgress(null); // no success but no need to log; just retry @@ -1151,7 +1161,8 @@ public class Connection implements Runnable { throw e; } catch (ConnectionException e) { - if (giveUpOnMember(mgr, remoteAddr)) { + InternalDistributedMember m = mgr.getMemberForStub(key, true); + if (m == null) { IOException ioe = new IOException(LocalizedStrings.Connection_HANDSHAKE_FAILED.toLocalizedString()); ioe.initCause(e); throw ioe; @@ -1159,16 +1170,17 @@ public class Connection implements Runnable { t.getConduit().getCancelCriterion().checkCancelInProgress(null); logger.info(LocalizedMessage.create( LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1, - new Object[] {sharedResource, preserveOrder, remoteAddr ,e})); + new Object[] {sharedResource, preserveOrder, m,e})); } catch (IOException e) { - if (giveUpOnMember(mgr, remoteAddr)) { + InternalDistributedMember m = mgr.getMemberForStub(key, true); + if (m == null) { throw e; } t.getConduit().getCancelCriterion().checkCancelInProgress(null); logger.info(LocalizedMessage.create( LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1, - new Object[] {sharedResource, preserveOrder, remoteAddr ,e})); + new Object[] {sharedResource, preserveOrder, m,e})); if (!sharedResource && "Too many open files".equals(e.getMessage())) { t.fileDescriptorsExhausted(); } @@ -1208,7 +1220,7 @@ public class Connection implements Runnable { if (conn == null) { throw new ConnectionException( LocalizedStrings.Connection_CONNECTION_FAILED_CONSTRUCTION_FOR_PEER_0 - .toLocalizedString(remoteAddr)); + .toLocalizedString(mgr.getMemberForStub(key, true))); } if (preserveOrder && BATCH_SENDS) { conn.createBatchSendBuffer(); @@ -1216,15 +1228,12 @@ public class Connection implements Runnable { conn.finishedConnecting = true; return conn; } - - private static boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) { - return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress(); - } - private void setRemoteAddr(DistributedMember m) { + private void setRemoteAddr(InternalDistributedMember m, Stub stub) { this.remoteAddr = this.owner.getDM().getCanonicalId(m); + this.remoteId = stub; MembershipManager mgr = this.owner.owner.getMembershipManager(); - mgr.addSurpriseMember(m); + mgr.addSurpriseMember(m, stub); } /** creates a new connection to a remote server. @@ -1234,11 +1243,11 @@ public class Connection implements Runnable { private Connection(MembershipManager mgr, ConnectionTable t, boolean preserveOrder, - DistributedMember remoteID, + Stub key, + InternalDistributedMember remoteAddr, boolean sharedResource) throws IOException, DistributedSystemDisconnectedException { - InternalDistributedMember remoteAddr = (InternalDistributedMember)remoteID; if (t == null) { throw new IllegalArgumentException(LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString()); } @@ -1246,7 +1255,7 @@ public class Connection implements Runnable { this.owner = t; this.sharedResource = sharedResource; this.preserveOrder = preserveOrder; - setRemoteAddr(remoteAddr); + setRemoteAddr(remoteAddr, key); this.conduitIdStr = this.owner.getConduit().getId().toString(); this.handshakeRead = false; this.handshakeCancelled = false; @@ -1256,7 +1265,7 @@ public class Connection implements Runnable { // connect to listening socket - InetSocketAddress addr = new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort()); + InetSocketAddress addr = new InetSocketAddress(remoteId.getInetAddress(), remoteId.getPort()); if (useNIO()) { SocketChannel channel = SocketChannel.open(); this.owner.addConnectingSocket(channel.socket(), addr.getAddress()); @@ -1316,15 +1325,15 @@ public class Connection implements Runnable { else { if (TCPConduit.useSSL) { // socket = javax.net.ssl.SSLSocketFactory.getDefault() - // .createSocket(remoteAddr.getInetAddress(), remoteAddr.getPort()); + // .createSocket(remoteId.getInetAddress(), remoteId.getPort()); int socketBufferSize = sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize; - this.socket = SocketCreator.getDefaultInstance().connectForServer( remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort(), socketBufferSize ); + this.socket = SocketCreator.getDefaultInstance().connectForServer( remoteId.getInetAddress(), remoteId.getPort(), socketBufferSize ); // Set the receive buffer size local fields. It has already been set in the socket. setSocketBufferSize(this.socket, false, socketBufferSize, true); setSendBufferSize(this.socket); } else { - //socket = new Socket(remoteAddr.getInetAddress(), remoteAddr.getPort()); + //socket = new Socket(remoteId.getInetAddress(), remoteId.getPort()); Socket s = new Socket(); this.socket = s; s.setTcpNoDelay(true); @@ -1630,8 +1639,8 @@ public class Connection implements Runnable { // we can't wait for the reader thread when running in an IBM JRE. See // bug 41889 if (this.owner.owner.config.getEnableNetworkPartitionDetection() || - this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE || - this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { + this.owner.owner.getLocalId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE || + this.owner.owner.getLocalId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor")); } { @@ -1680,16 +1689,16 @@ public class Connection implements Runnable { // Only remove endpoint if sender. if (this.finishedConnecting) { // only remove endpoint if our constructor finished - this.owner.removeEndpoint(this.remoteAddr, reason); + this.owner.removeEndpoint(this.remoteId, reason); } } } else { - this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this); + this.owner.removeSharedConnection(reason, this.remoteId, this.preserveOrder, this); } } else if (!this.isReceiver) { - this.owner.removeThreadConnection(this.remoteAddr, this); + this.owner.removeThreadConnection(this.remoteId, this); } } else { @@ -1697,10 +1706,10 @@ public class Connection implements Runnable { // has never added this Connection to its maps since // the calls in this block use our identity to do the removes. if (this.sharedResource) { - this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this); + this.owner.removeSharedConnection(reason, this.remoteId, this.preserveOrder, this); } else if (!this.isReceiver) { - this.owner.removeThreadConnection(this.remoteAddr, this); + this.owner.removeThreadConnection(this.remoteId, this); } } } @@ -1744,7 +1753,7 @@ public class Connection implements Runnable { } finally { // bug36060: do the socket close within a finally block if (logger.isDebugEnabled()) { - logger.debug("Stopping {} for {}", p2pReaderName(), remoteAddr); + logger.debug("Stopping {} for {}", p2pReaderName(), remoteId); } initiateSuspicionIfSharedUnordered(); if (this.isReceiver) { @@ -2329,7 +2338,8 @@ public class Connection implements Runnable { .toLocalizedString(new Object[]{new Byte(HANDSHAKE_VERSION), new Byte(handShakeByte)})); } InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis); - setRemoteAddr(remote); + Stub stub = new Stub(remote.getInetAddress()/*fix for bug 33615*/, remote.getDirectChannelPort(), remote.getVmViewId()); + setRemoteAddr(remote, stub); Thread.currentThread().setName(LocalizedStrings.Connection_P2P_MESSAGE_READER_FOR_0.toLocalizedString(this.remoteAddr, this.socket.getPort())); this.sharedResource = dis.readBoolean(); this.preserveOrder = dis.readBoolean(); @@ -2367,7 +2377,7 @@ public class Connection implements Runnable { } if (logger.isDebugEnabled()) { - logger.debug("{} remoteAddr is {} {}", p2pReaderName(), this.remoteAddr, + logger.debug("{} remoteId is {} {}", p2pReaderName(), this.remoteId, (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : "")); } @@ -2545,7 +2555,7 @@ public class Connection implements Runnable { throws IOException, ConnectionException { if (!connected) { - throw new ConnectionException(LocalizedStrings.Connection_NOT_CONNECTED_TO_0.toLocalizedString(this.remoteAddr)); + throw new ConnectionException(LocalizedStrings.Connection_NOT_CONNECTED_TO_0.toLocalizedString(this.remoteId)); } if (this.batchFlusher != null) { batchSend(buffer); @@ -2768,7 +2778,7 @@ public class Connection implements Runnable { if (this.disconnectRequested) { buffer.position(origBufferPos); // we have given up so just drop this message. - throw new ConnectionException(LocalizedStrings.Connection_FORCED_DISCONNECT_SENT_TO_0.toLocalizedString(this.remoteAddr)); + throw new ConnectionException(LocalizedStrings.Connection_FORCED_DISCONNECT_SENT_TO_0.toLocalizedString(this.remoteId)); } if (!force && !this.asyncQueuingInProgress) { // reset buffer since we will be sending it. This fixes bug 34832 @@ -2970,7 +2980,7 @@ public class Connection implements Runnable { } DM dm = this.owner.getDM(); if (dm == null) { - this.owner.removeEndpoint(this.remoteAddr, LocalizedStrings.Connection_NO_DISTRIBUTION_MANAGER.toLocalizedString()); + this.owner.removeEndpoint(this.remoteId, LocalizedStrings.Connection_NO_DISTRIBUTION_MANAGER.toLocalizedString()); return; } dm.getMembershipManager().requestMemberRemoval(this.remoteAddr, @@ -2991,7 +3001,7 @@ public class Connection implements Runnable { return; } } - this.owner.removeEndpoint(this.remoteAddr, + this.owner.removeEndpoint(this.remoteId, LocalizedStrings.Connection_FORCE_DISCONNECT_TIMED_OUT.toLocalizedString()); if (dm.getOtherDistributionManagerIds().contains(this.remoteAddr)) { if (logger.isDebugEnabled()) { @@ -3100,7 +3110,7 @@ public class Connection implements Runnable { stats.incAsyncThreads(-1); stats.incAsyncQueues(-1); if (logger.isDebugEnabled()) { - logger.debug("runNioPusher terminated id={} from {}/{}", conduitIdStr, remoteAddr, remoteAddr); + logger.debug("runNioPusher terminated id={} from {}/{}", conduitIdStr, remoteId, remoteAddr); } } } finally { @@ -3827,7 +3837,8 @@ public class Connection implements Runnable { throw new IllegalStateException(LocalizedStrings.Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1.toLocalizedString(new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(handShakeByte)})); } InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis); - setRemoteAddr(remote); + Stub stub = new Stub(remote.getInetAddress()/*fix for bug 33615*/, remote.getDirectChannelPort(), remote.getVmViewId()); + setRemoteAddr(remote, stub); this.sharedResource = dis.readBoolean(); this.preserveOrder = dis.readBoolean(); this.uniqueId = dis.readLong(); @@ -3886,7 +3897,7 @@ public class Connection implements Runnable { return; } if (logger.isDebugEnabled()) { - logger.debug("P2P handshake remoteAddr is {}{}", this.remoteAddr, + logger.debug("P2P handshake remoteId is {}{}", this.remoteId, (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : "")); } try { @@ -4020,6 +4031,12 @@ public class Connection implements Runnable { this.accessed = true; } + /** returns the ConnectionKey stub representing the other side of + this connection (host:port) */ + public final Stub getRemoteId() { + return remoteId; + } + /** return the DM id of the guy on the other side of this connection. */ public final InternalDistributedMember getRemoteAddress() { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java index 3816efe..bac356c 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java @@ -42,7 +42,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.SystemFailure; -import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; @@ -61,7 +60,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; /** <p>ConnectionTable holds all of the Connection objects in a conduit. Connections represent a pipe between two endpoints represented - by generic DistributedMembers.</p> + by generic Stubs.</p> @author Bruce Schuchardt @author Darrel Schneider @@ -346,7 +345,7 @@ public class ConnectionTable { /** * Process a newly created PendingConnection * - * @param id DistributedMember on which the connection is created + * @param id Stub on which the connection is created * @param sharedResource whether the connection is used by multiple threads * @param preserveOrder whether to preserve order * @param m map to add the connection to @@ -358,7 +357,7 @@ public class ConnectionTable { * @throws IOException if unable to connect * @throws DistributedSystemDisconnectedException */ - private Connection handleNewPendingConnection(DistributedMember id, boolean sharedResource, + private Connection handleNewPendingConnection(Stub id, boolean sharedResource, boolean preserveOrder, Map m, PendingConnection pc, long startTime, long ackThreshold, long ackSAThreshold) throws IOException, DistributedSystemDisconnectedException @@ -367,7 +366,7 @@ public class ConnectionTable { Connection con = null; try { con = Connection.createSender(owner.getMembershipManager(), this, preserveOrder, - id, + id, this.owner.getMemberForStub(id, false), sharedResource, startTime, ackThreshold, ackSAThreshold); this.owner.stats.incSenders(sharedResource, preserveOrder); @@ -443,7 +442,7 @@ public class ConnectionTable { * unordered or conserve-sockets * note that unordered connections are currently always shared * - * @param id the DistributedMember on which we are creating a connection + * @param id the Stub on which we are creating a connection * @param threadOwnsResources whether unordered conn is owned by the current thread * @param preserveOrder whether to preserve order * @param startTime the ms clock start time for the operation @@ -453,7 +452,7 @@ public class ConnectionTable { * @throws IOException if unable to create the connection * @throws DistributedSystemDisconnectedException */ - private Connection getUnorderedOrConserveSockets(DistributedMember id, + private Connection getUnorderedOrConserveSockets(Stub id, boolean threadOwnsResources, boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout) throws IOException, DistributedSystemDisconnectedException @@ -528,7 +527,7 @@ public class ConnectionTable { * @throws IOException if the connection could not be created * @throws DistributedSystemDisconnectedException */ - Connection getOrderedAndOwned(DistributedMember id, long startTime, long ackTimeout, long ackSATimeout) + Connection getOrderedAndOwned(Stub id, long startTime, long ackTimeout, long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { Connection result = null; @@ -567,7 +566,7 @@ public class ConnectionTable { // OK, we have to create a new connection. result = Connection.createSender(owner.getMembershipManager(), this, true /* preserveOrder */, id, - false /* shared */, + this.owner.getMemberForStub(id, false), false /* shared */, startTime, ackTimeout, ackSATimeout); if (logger.isDebugEnabled()) { logger.debug("ConnectionTable: created an ordered connection: {}", result); @@ -584,7 +583,7 @@ public class ConnectionTable { ArrayList al = (ArrayList)this.threadConnectionMap.get(id); if (al == null) { - // First connection for this DistributedMember. Make sure list for this + // First connection for this Stub. Make sure list for this // stub is created if it isn't already there. al = new ArrayList(); @@ -652,7 +651,7 @@ public class ConnectionTable { /** * Get a new connection - * @param id the DistributedMember on which to create the connection + * @param id the Stub on which to create the connection * @param preserveOrder whether order should be preserved * @param startTime the ms clock start time * @param ackTimeout the ms ack-wait-threshold, or zero @@ -661,7 +660,7 @@ public class ConnectionTable { * @throws java.io.IOException if the connection could not be created * @throws DistributedSystemDisconnectedException */ - protected Connection get(DistributedMember id, boolean preserveOrder, + protected Connection get(Stub id, boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout) throws java.io.IOException, DistributedSystemDisconnectedException { @@ -839,38 +838,34 @@ public class ConnectionTable { /** * Return true if our owner already knows that this endpoint is departing */ - protected boolean isEndpointShuttingDown(DistributedMember id) { - return giveUpOnMember(owner.getDM().getMembershipManager(), id); + protected boolean isEndpointShuttingDown(Stub stub) { + return this.owner.getMemberForStub(stub, true) == null; } - protected boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) { - return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress(); - } - /** remove an endpoint and notify the membership manager of the departure */ - protected void removeEndpoint(DistributedMember stub, String reason) { + protected void removeEndpoint(Stub stub, String reason) { removeEndpoint(stub, reason, true); } - protected void removeEndpoint(DistributedMember memberID, String reason, boolean notifyDisconnect) { + protected void removeEndpoint(Stub stub, String reason, boolean notifyDisconnect) { if (this.closed) { return; } boolean needsRemoval = false; synchronized (this.orderedConnectionMap) { - if (this.orderedConnectionMap.get(memberID) != null) + if (this.orderedConnectionMap.get(stub) != null) needsRemoval = true; } if (!needsRemoval) { synchronized (this.unorderedConnectionMap) { - if (this.unorderedConnectionMap.get(memberID) != null) + if (this.unorderedConnectionMap.get(stub) != null) needsRemoval = true; } } if (!needsRemoval) { ConcurrentMap cm = this.threadConnectionMap; if (cm != null) { - ArrayList al = (ArrayList)cm.get(memberID); + ArrayList al = (ArrayList)cm.get(stub); needsRemoval = al != null && al.size() > 0; } } @@ -878,14 +873,14 @@ public class ConnectionTable { if (needsRemoval) { InternalDistributedMember remoteAddress = null; synchronized (this.orderedConnectionMap) { - Object c = this.orderedConnectionMap.remove(memberID); + Object c = this.orderedConnectionMap.remove(stub); if (c instanceof Connection) { remoteAddress = ((Connection) c).getRemoteAddress(); } closeCon(reason, c); } synchronized (this.unorderedConnectionMap) { - Object c = this.unorderedConnectionMap.remove(memberID); + Object c = this.unorderedConnectionMap.remove(stub); if (remoteAddress == null && (c instanceof Connection)) { remoteAddress = ((Connection) c).getRemoteAddress(); } @@ -895,7 +890,7 @@ public class ConnectionTable { { ConcurrentMap cm = this.threadConnectionMap; if (cm != null) { - ArrayList al = (ArrayList)cm.remove(memberID); + ArrayList al = (ArrayList)cm.remove(stub); if (al != null) { synchronized (al) { for (Iterator it=al.iterator(); it.hasNext();) { @@ -917,7 +912,7 @@ public class ConnectionTable { for (Iterator it=connectingSockets.entrySet().iterator(); it.hasNext(); ) { Map.Entry entry = (Map.Entry)it.next(); ConnectingSocketInfo info = (ConnectingSocketInfo)entry.getValue(); - if (info.peerAddress.equals(((InternalDistributedMember)memberID).getInetAddress())) { + if (info.peerAddress.equals(stub.getInetAddress())) { toRemove.add(entry.getKey()); it.remove(); } @@ -930,7 +925,7 @@ public class ConnectionTable { } catch (IOException e) { if (logger.isDebugEnabled()) { - logger.debug("caught exception while trying to close connecting socket for {}", memberID, e); + logger.debug("caught exception while trying to close connecting socket for {}", stub, e); } } } @@ -942,7 +937,7 @@ public class ConnectionTable { synchronized (this.receivers) { for (Iterator it=receivers.iterator(); it.hasNext();) { Connection con = (Connection)it.next(); - if (memberID.equals(con.getRemoteAddress())) { + if (stub.equals(con.getRemoteId())) { it.remove(); toRemove.add(con); } @@ -952,13 +947,10 @@ public class ConnectionTable { Connection con = (Connection)it.next(); closeCon(reason, con); } + // call memberDeparted after doing the closeCon calls + // so it can recursively call removeEndpoint if (notifyDisconnect) { - // Before the removal of TCPConduit Stub addresses this used - // to call MembershipManager.getMemberForStub, which checked - // for a shutdown in progress and threw this exception: - if (owner.getDM().shutdownInProgress()) { - throw new DistributedSystemDisconnectedException("Shutdown in progress", owner.getDM().getMembershipManager().getShutdownCause()); - } + owner.getMemberForStub(stub, false); } if (remoteAddress != null) { @@ -972,11 +964,11 @@ public class ConnectionTable { } /** check to see if there are still any receiver threads for the given end-point */ - protected boolean hasReceiversFor(DistributedMember endPoint) { + protected boolean hasReceiversFor(Stub endPoint) { synchronized (this.receivers) { for (Iterator it=receivers.iterator(); it.hasNext();) { Connection con = (Connection)it.next(); - if (endPoint.equals(con.getRemoteAddress())) { + if (endPoint.equals(con.getRemoteId())) { return true; } } @@ -984,7 +976,7 @@ public class ConnectionTable { return false; } - private static void removeFromThreadConMap(ConcurrentMap cm, DistributedMember stub, Connection c) { + private static void removeFromThreadConMap(ConcurrentMap cm, Stub stub, Connection c) { if (cm != null) { ArrayList al = (ArrayList)cm.get(stub); if (al != null) { @@ -994,7 +986,7 @@ public class ConnectionTable { } } } - protected void removeThreadConnection(DistributedMember stub, Connection c) { + protected void removeThreadConnection(Stub stub, Connection c) { /*if (this.closed) { return; }*/ @@ -1009,7 +1001,7 @@ public class ConnectionTable { } // synchronized } // m != null } - void removeSharedConnection(String reason, DistributedMember stub, boolean ordered, Connection c) { + void removeSharedConnection(String reason, Stub stub, boolean ordered, Connection c) { if (this.closed) { return; } @@ -1062,7 +1054,7 @@ public class ConnectionTable { Iterator it = m.entrySet().iterator(); while (it.hasNext()) { Map.Entry me = (Map.Entry)it.next(); - DistributedMember stub = (DistributedMember)me.getKey(); + Stub stub = (Stub)me.getKey(); Connection c = (Connection)me.getValue(); removeFromThreadConMap(this.threadConnectionMap, stub, c); it.remove(); @@ -1087,7 +1079,7 @@ public class ConnectionTable { * from being formed or new messages from being sent * @since 5.1 */ - protected void getThreadOwnedOrderedConnectionState(DistributedMember member, + protected void getThreadOwnedOrderedConnectionState(Stub member, Map result) { ConcurrentMap cm = this.threadConnectionMap; @@ -1113,7 +1105,7 @@ public class ConnectionTable { * wait for the given incoming connections to receive at least the associated * number of messages */ - protected void waitForThreadOwnedOrderedConnectionState(DistributedMember member, + protected void waitForThreadOwnedOrderedConnectionState(Stub member, Map connectionStates) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // wisest to do this before the synchronize below List r = null; @@ -1123,14 +1115,14 @@ public class ConnectionTable { for (Iterator it=r.iterator(); it.hasNext();) { Connection con = (Connection)it.next(); if (!con.stopped && !con.isClosing() && !con.getOriginatedHere() && con.getPreserveOrder() - && member.equals(con.getRemoteAddress())) { + && member.equals(con.getRemoteId())) { Long state = (Long)connectionStates.remove(Long.valueOf(con.getUniqueId())); if (state != null) { long count = state.longValue(); while (!con.stopped && !con.isClosing() && con.getMessagesReceived() < count) { if (logger.isDebugEnabled()) { logger.debug("Waiting for connection {}/{} currently={} need={}", - con.getRemoteAddress(), con.getUniqueId(), con.getMessagesReceived(), count); + con.getRemoteId(), con.getUniqueId(), con.getMessagesReceived(), count); } Thread.sleep(100); } @@ -1238,11 +1230,11 @@ public class ConnectionTable { /** * the stub we are connecting to */ - private final DistributedMember id; + private final Stub id; private final Thread connectingThread; - public PendingConnection(boolean preserveOrder, DistributedMember id) { + public PendingConnection(boolean preserveOrder, Stub id) { this.preserveOrder = preserveOrder; this.id = id; this.connectingThread = Thread.currentThread(); @@ -1287,9 +1279,10 @@ public class ConnectionTable { boolean severeAlertIssued = false; boolean suspected = false; - DistributedMember targetMember = null; + InternalDistributedMember targetMember = null; if (ackSATimeout > 0) { - targetMember = this.id; + targetMember = + ((GMSMembershipManager)mgr).getMemberForStub(this.id, false); } for (;;) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java index a954814..5cd426f 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java @@ -18,7 +18,6 @@ package com.gemstone.gemfire.internal.tcp; import com.gemstone.gemfire.GemFireException; -import com.gemstone.gemfire.distributed.DistributedMember; /** * MemberShunnedException may be thrown to prevent ack-ing a message @@ -29,13 +28,13 @@ import com.gemstone.gemfire.distributed.DistributedMember; public class MemberShunnedException extends GemFireException { private static final long serialVersionUID = -8453126202477831557L; - private DistributedMember member; + private Stub member; /** * constructor * @param member the member that was shunned */ - public MemberShunnedException(DistributedMember member) { + public MemberShunnedException(Stub member) { super(""); this.member = member; } @@ -43,7 +42,7 @@ public class MemberShunnedException extends GemFireException /** * @return the member that was shunned */ - public DistributedMember getShunnedMember() { + public Stub getShunnedMember() { return this.member; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java index cd711e7..fd495d9 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java @@ -16,7 +16,6 @@ */ package com.gemstone.gemfire.internal.tcp; -import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.internal.DistributionMessage; import com.gemstone.gemfire.distributed.internal.membership.*; import com.gemstone.gemfire.i18n.LogWriterI18n; @@ -35,7 +34,7 @@ import com.gemstone.gemfire.i18n.LogWriterI18n; public interface ServerDelegate { public void receive( DistributionMessage message, int bytesRead, - DistributedMember connId ); + Stub connId ); public LogWriterI18n getLogger(); @@ -43,5 +42,5 @@ public interface ServerDelegate { * Called when a possibly new member is detected by receiving a direct channel * message from him. */ - public void newMemberConnected(InternalDistributedMember member); + public void newMemberConnected(InternalDistributedMember member, Stub id); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/507f2f3a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java new file mode 100644 index 0000000..2e4b91b --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.internal.tcp; + +import java.io.*; +import java.net.*; + +import com.gemstone.gemfire.DataSerializable; +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.internal.InternalDataSerializer; + +/** Stub represents an ip address and port. + + @author Bruce Schuchardt + @since 2.0 + + */ + +public class Stub implements Externalizable, DataSerializable +{ + private InetAddress inAddr; + private int port; + private int viewID; + + public Stub() { + // public default needed for deserialization + } + + public Stub(InetAddress addr, int port, int vmViewID) { + viewID = vmViewID; + inAddr = addr; + this.port = port; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o instanceof Stub) { + Stub s = (Stub)o; + boolean result; + if (inAddr == null) + result = s.inAddr == null; + else + result = inAddr.equals(s.inAddr); + result = result && port == s.port; + if (this.viewID != 0 && s.viewID != 0) { + result = result && (this.viewID == s.viewID); + } + return result; + } + else { + return false; + } + } + + // hashCode equates to the address hashCode for fast connection lookup + @Override + public int hashCode() { + // do not use viewID in hashCode because it is changed after creating a stub + int result = 0; + // result += inAddr.hashCode(); // useless + result += port; + return result; + } + + public void setViewID(int viewID) { + this.viewID = viewID; + } + + public int getPort() { + return port; + } + + public int getViewID() { + return this.viewID; + } + + public InetAddress getInetAddress() { + return inAddr; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(80); + sb.append("tcp://"); + if (inAddr == null) + sb.append("<null>"); + else + sb.append(inAddr.toString()); + if (this.viewID != 0) { + sb.append("<v"+this.viewID+">"); + } + sb.append(":" + port); + return sb.toString(); + } + + /** + * Writes the contents of this <code>Stub</code> to a + * <code>DataOutput</code>. + * + * @since 3.0 + */ + public void toData(DataOutput out) + throws IOException + { + DataSerializer.writeInetAddress(inAddr, out); + out.writeInt(port); + out.writeInt(viewID); + } + + /** + * Reads the contents of this <code>Stub</code> from a + * <code>DataOutput</code>. + * + * @since 3.0 + */ + public void fromData(DataInput in) + throws IOException, ClassNotFoundException + { + inAddr = DataSerializer.readInetAddress(in); + this.port = in.readInt(); + this.viewID = in.readInt(); + } + + /** + * static factory method + * @since 5.0.2 + */ + public static Stub createFromData(DataInput in) + throws IOException, ClassNotFoundException + { + Stub result = new Stub(); + InternalDataSerializer.invokeFromData(result, in); + return result; + } + + public void writeExternal(ObjectOutput os) + throws IOException + { + this.toData(os); + } + + public void readExternal(ObjectInput is) + throws IOException, ClassNotFoundException + { + this.fromData(is); + } +}
