Revert "Revert "Removing TCPConduit's Stub ID class"" This reverts commit 507f2f3a905e70fcabed9b83d4dc966ef3e9e6ec.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4bf4557b Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4bf4557b Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4bf4557b Branch: refs/heads/develop Commit: 4bf4557b2cfe12b9396aeb43dd0b916cbcb98b89 Parents: ec9d16a Author: Bruce Schuchardt <[email protected]> Authored: Tue Dec 15 08:21:27 2015 -0800 Committer: Bruce Schuchardt <[email protected]> Committed: Tue Dec 15 08:21:27 2015 -0800 ---------------------------------------------------------------------- .../internal/DistributionManager.java | 8 - .../distributed/internal/StartupMessage.java | 1 - .../internal/direct/DirectChannel.java | 93 +------ .../internal/direct/MissingStubException.java | 37 --- .../internal/direct/ShunnedMemberException.java | 34 +++ .../internal/membership/MembershipManager.java | 29 +- .../gms/mgr/GMSMembershipManager.java | 197 ++----------- .../internal/i18n/ParentLocalizedStrings.java | 6 +- .../gemfire/internal/tcp/Connection.java | 117 ++++---- .../gemfire/internal/tcp/ConnectionTable.java | 91 +++--- .../internal/tcp/MemberShunnedException.java | 7 +- .../gemfire/internal/tcp/ServerDelegate.java | 5 +- .../com/gemstone/gemfire/internal/tcp/Stub.java | 164 ----------- .../gemfire/internal/tcp/TCPConduit.java | 274 +++---------------- .../internal/DistributionManagerDUnitTest.java | 6 +- .../gms/mgr/GMSMembershipManagerJUnitTest.java | 31 +-- .../internal/tcp/ConnectionJUnitTest.java | 3 +- 17 files changed, 233 insertions(+), 870 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java index 964845c..e3c342a 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionManager.java @@ -91,7 +91,6 @@ import com.gemstone.gemfire.internal.sequencelog.MembershipLogger; import com.gemstone.gemfire.internal.tcp.Connection; import com.gemstone.gemfire.internal.tcp.ConnectionTable; import com.gemstone.gemfire.internal.tcp.ReenteredConnectException; -import com.gemstone.gemfire.internal.tcp.Stub; import com.gemstone.gemfire.internal.util.concurrent.StoppableReentrantLock; /** @@ -2715,13 +2714,6 @@ public class DistributionManager return false; // no peers, we are alone. } - // ensure we have stubs for everyone else - Iterator it = allOthers.iterator(); - while (it.hasNext()) { - InternalDistributedMember member = (InternalDistributedMember)it.next(); - membershipManager.getStubForMember(member); - } - try { ok = op.sendStartupMessage(allOthers, STARTUP_TIMEOUT, equivs, redundancyZone, enforceUniqueZone()); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java index 96f8b60..01f8c62 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/StartupMessage.java @@ -37,7 +37,6 @@ import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributes import com.gemstone.gemfire.internal.InternalInstantiator.InstantiatorAttributesHolder; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.tcp.Stub; /** * A message that is sent to all other distribution manager when http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java index 14ff923..d4df3bf 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/DirectChannel.java @@ -38,6 +38,7 @@ import com.gemstone.gemfire.InternalGemFireException; import com.gemstone.gemfire.SystemFailure; import com.gemstone.gemfire.ToDataException; import com.gemstone.gemfire.cache.TimeoutException; +import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.distributed.internal.DMStats; @@ -64,7 +65,6 @@ import com.gemstone.gemfire.internal.tcp.Connection; import com.gemstone.gemfire.internal.tcp.ConnectionException; import com.gemstone.gemfire.internal.tcp.MemberShunnedException; import com.gemstone.gemfire.internal.tcp.MsgStreamer; -import com.gemstone.gemfire.internal.tcp.Stub; import com.gemstone.gemfire.internal.tcp.TCPConduit; import com.gemstone.gemfire.internal.util.Breadcrumbs; import com.gemstone.gemfire.internal.util.concurrent.ReentrantSemaphore; @@ -115,13 +115,6 @@ public class DirectChannel { } /** - * Returns the endpoint ID for the direct channel - */ - public Stub getLocalStub() { - return conduit.getId(); - } - - /** * when the initial number of members is known, this method is invoked * to ensure that connections to those members can be established in a * reasonable amount of time. See bug 39848 @@ -181,7 +174,7 @@ public class DirectChannel { this.groupOrderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS); this.groupUnorderedSenderSem = new ReentrantSemaphore(MAX_GROUP_SENDERS); logger.info(LocalizedMessage.create( - LocalizedStrings.DirectChannel_GEMFIRE_P2P_LISTENER_STARTED_ON__0, conduit.getId())); + LocalizedStrings.DirectChannel_GEMFIRE_P2P_LISTENER_STARTED_ON__0, conduit.getLocalAddr())); } catch (ConnectionException ce) { @@ -192,48 +185,6 @@ public class DirectChannel { } -// /** -// * -// * @param addr destination for the message -// * @param stubMap map containing all the stubs -// * @param msg the original message -// * @param msgBuf the serialized message -// * @param directAck true if we need an ack -// * @param processorType the type (serialized, etc.) -// * @return if directAck, the Connection that needs the acknowledgment -// * @throws MissingStubException if we do not have a Stub for the recipient -// * @throws IOException if the message could not be sent -// */ -// private Connection attemptSingleSend(MembershipManager mgr, -// InternalDistributedMember addr, -// DistributionMessage msg, ByteBuffer msgBuf, -// boolean directAck, int processorType) -// throws MissingStubException, IOException -// { -// if (!msg.deliverToSender() && localAddr.equals(addr)) -// return null; - -// if (addr == null) -// return null; -// Stub dest = mgr.getStubForMember(addr); -// if (dest == null) { -// // This should only happen if the member is no longer in the view. -// Assert.assertTrue(!mgr.memberExists(addr)); -// throw new MissingStubException("No stub"); -// } -// try { -// msgBuf.position(0); // fix for bug#30680 -// Connection con = conduit.sendSync(dest, msgBuf, processorType, msg); -// if (directAck) -// return con; -// else -// return null; -// } -// catch(IOException t) { -// throw t; -// } -// } - /** * Return how many concurrent operations should be allowed by default. * since 6.6, this has been raised to Integer.MAX value from the number @@ -639,22 +590,13 @@ public class DirectChannel { continue; } - Stub stub = mgr.getStubForMember(destination); - if (stub == null) { + if (!mgr.memberExists(destination) || mgr.shutdownInProgress() || mgr.isShunned(destination)) { // This should only happen if the member is no longer in the view. if (logger.isTraceEnabled(LogMarker.DM)) { - logger.trace(LogMarker.DM, "No Stub for {}", destination); + logger.trace(LogMarker.DM, "Not a member: {}", destination); } - // The only time getStubForMember returns null is if we are - // shunning that member or we are shutting down. - // So the following assertion is wrong: - //Assert.assertTrue(!mgr.memberExists(destination)); - // instead we should: - // Assert.assertTrue(mgr.shutdownInProgress() || mgr.isShunned(destination)); - //but this is not worth doing and isShunned is not public. - // SO the assert has been deadcoded. if (ce == null) ce = new ConnectExceptions(); - ce.addFailure(destination, new MissingStubException(LocalizedStrings.DirectChannel_NO_STUB_0.toLocalizedString())); + ce.addFailure(destination, new ShunnedMemberException(LocalizedStrings.DirectChannel_SHUNNING_0.toLocalizedString())); } else { try { @@ -662,8 +604,8 @@ public class DirectChannel { if (ackTimeout > 0) { startTime = System.currentTimeMillis(); } - Connection con = conduit.getConnection(destination, stub, - preserveOrder, retry, startTime, ackTimeout, ackSDTimeout); + Connection con = conduit.getConnection(destination, preserveOrder, + retry, startTime, ackTimeout, ackSDTimeout); con.setInUse(true, startTime, 0, 0, null); // fix for bug#37657 cons.add(con); @@ -823,7 +765,7 @@ public class DirectChannel { } - public void receive(DistributionMessage msg, int bytesRead, Stub connId) { + public void receive(DistributionMessage msg, int bytesRead) { if (disconnected) { return; } @@ -844,10 +786,6 @@ public class DirectChannel { } } -// public void newMemberConnected(InternalDistributedMember member, Stub id) { -// receiver.newMemberConnected(member, id); -// } - public InternalDistributedMember getLocalAddress() { return this.localAddr; } @@ -930,13 +868,6 @@ public class DirectChannel { } } - /** Create a TCPConduit stub from a JGroups InternalDistributedMember */ - public Stub createConduitStub(InternalDistributedMember addr) { - int port = addr.getDirectChannelPort(); - Stub stub = new Stub(addr.getInetAddress(), port, addr.getVmViewId()); - return stub; - } - public void closeEndpoint(InternalDistributedMember member, String reason) { closeEndpoint(member, reason, true); } @@ -948,7 +879,7 @@ public class DirectChannel { public void closeEndpoint(InternalDistributedMember member, String reason, boolean notifyDisconnect) { TCPConduit tc = this.conduit; if (tc != null) { - tc.removeEndpoint(createConduitStub(member), reason, notifyDisconnect); + tc.removeEndpoint(member, reason, notifyDisconnect); } } @@ -962,7 +893,7 @@ public class DirectChannel { * the map to add the state to * @since 5.1 */ - public void getChannelStates(Stub member, Map result) + public void getChannelStates(DistributedMember member, Map result) { TCPConduit tc = this.conduit; if (tc != null) { @@ -974,7 +905,7 @@ public class DirectChannel { * wait for the given connections to process the number of messages * associated with the connection in the given map */ - public void waitForChannelState(Stub member, Map channelState) + public void waitForChannelState(DistributedMember member, Map channelState) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); @@ -987,7 +918,7 @@ public class DirectChannel { /** * returns true if there are still receiver threads for the given member */ - public boolean hasReceiversFor(Stub mbr) { + public boolean hasReceiversFor(DistributedMember mbr) { return this.conduit.hasReceiversFor(mbr); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java deleted file mode 100644 index 49b4486..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/MissingStubException.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.distributed.internal.direct; - -import com.gemstone.gemfire.GemFireCheckedException; - -/** - * Exception thrown when the TCPConduit is unable to acquire a stub - * for the given recipient. - * - * @author jpenney - * - */ -public class MissingStubException extends GemFireCheckedException -{ - - private static final long serialVersionUID = -6455664684151074915L; - - public MissingStubException(String msg) { - super(msg); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java new file mode 100644 index 0000000..59db762 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/direct/ShunnedMemberException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.distributed.internal.direct; + +import com.gemstone.gemfire.GemFireCheckedException; + +/** + * Exception thrown when a member is no longer in the distributed system + * + */ +public class ShunnedMemberException extends GemFireCheckedException +{ + + private static final long serialVersionUID = -6455664684151074915L; + + public ShunnedMemberException(String msg) { + super(msg); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java index a46680b..7416efa 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MembershipManager.java @@ -27,7 +27,6 @@ import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.internal.DMStats; import com.gemstone.gemfire.distributed.internal.DistributionMessage; import com.gemstone.gemfire.internal.logging.InternalLogWriter; -import com.gemstone.gemfire.internal.tcp.Stub; /** * A MembershipManager is responsible for reporting a MemberView, as well as @@ -74,7 +73,7 @@ public interface MembershipManager { * @param m the member * @return true if it still exists */ - public boolean memberExists(InternalDistributedMember m); + public boolean memberExists(DistributedMember m); /** * Is this manager still connected? If it has not been initialized, this @@ -143,25 +142,6 @@ public interface MembershipManager { throws NotSerializableException; /** - * Return a {@link Stub} referring to the given member. A <em>null</em> may - * be returned if the system is not employing stubs for communication. - * - * @param m the member - * @return the stub - */ - public Stub getStubForMember(InternalDistributedMember m); - - /** - * Return a {@link InternalDistributedMember} associated with the given Stub. This - * method may return a null if Stubs are not being used. - * @param s Stub to look up - * @param validated true if member must be in the current view - * @return the member associated with the given stub, if any - */ - public InternalDistributedMember getMemberForStub(Stub s, boolean validated); - - - /** * Indicates to the membership manager that the system is shutting down. * Typically speaking, this means that new connection attempts are to be * ignored and disconnect failures are to be (more) tolerated. @@ -286,7 +266,7 @@ public interface MembershipManager { */ public void warnShun(DistributedMember mbr); - public boolean addSurpriseMember(DistributedMember mbr, Stub stub); + public boolean addSurpriseMember(DistributedMember mbr); /** if a StartupMessage is going to reject a new member, this should be used * to make sure we don't keep that member on as a "surprise member" @@ -307,6 +287,11 @@ public interface MembershipManager { * @return true if the member is a surprise member */ public boolean isSurpriseMember(DistributedMember m); + + /** + * Returns true if the member is being shunned + */ + public boolean isShunned(DistributedMember m); /** * Forces use of UDP for communications in the current thread. UDP is http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index 0b7a544..7be0a3a 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@ -94,7 +94,6 @@ import com.gemstone.gemfire.internal.logging.log4j.LogMarker; import com.gemstone.gemfire.internal.shared.StringPrintWriter; import com.gemstone.gemfire.internal.tcp.ConnectExceptions; import com.gemstone.gemfire.internal.tcp.MemberShunnedException; -import com.gemstone.gemfire.internal.tcp.Stub; import com.gemstone.gemfire.internal.util.Breadcrumbs; public class GMSMembershipManager implements MembershipManager, Manager @@ -156,7 +155,6 @@ public class GMSMembershipManager implements MembershipManager, Manager boolean crashed; String reason; DistributionMessage dmsg; - Stub stub; NetView gmsView; @Override @@ -165,7 +163,7 @@ public class GMSMembershipManager implements MembershipManager, Manager sb.append("kind="); switch (kind) { case SURPRISE_CONNECT: - sb.append("connect; member = <" + member + ">; stub = " + stub); + sb.append("connect; member = <" + member + ">"); break; case VIEW: String text = gmsView.toString(); @@ -184,12 +182,10 @@ public class GMSMembershipManager implements MembershipManager, Manager /** * Create a surprise connect event * @param member the member connecting - * @param id the stub */ - StartupEvent(final InternalDistributedMember member, final Stub id) { + StartupEvent(final InternalDistributedMember member) { this.kind = SURPRISE_CONNECT; this.member = member; - this.stub = id; } /** * Indicate if this is a surprise connect event @@ -282,24 +278,6 @@ public class GMSMembershipManager implements MembershipManager, Manager volatile boolean hasJoined; /** - * a map keyed on InternalDistributedMember, values are Stubs that represent direct - * channels to other systems - * - * Accesses must be under the read or write lock of {@link #latestViewLock}. - */ - protected final Map<InternalDistributedMember, Stub> memberToStubMap = - new ConcurrentHashMap<InternalDistributedMember, Stub>(); - - /** - * a map of direct channels (Stub) to InternalDistributedMember. key instanceof Stub - * value instanceof InternalDistributedMember - * - * Accesses must be under the read or write lock of {@link #latestViewLock}. - */ - protected final Map<Stub, InternalDistributedMember> stubToMemberMap = - new ConcurrentHashMap<Stub, InternalDistributedMember>(); - - /** * Members of the distributed system that we believe have shut down. * Keys are instances of {@link InternalDistributedMember}, values are * Longs indicating the time this member was shunned. @@ -547,12 +525,6 @@ public class GMSMembershipManager implements MembershipManager, Manager } } - // fix for bug #42006, lingering old identity - Object oldStub = this.memberToStubMap.remove(m); - if (oldStub != null) { - this.stubToMemberMap.remove(oldStub); - } - if (shutdownInProgress()) { addShunnedMember(m); continue; // no additions processed after shutdown begins @@ -806,9 +778,6 @@ public class GMSMembershipManager implements MembershipManager, Manager if (directChannel != null) { directChannel.setLocalAddr(address); - Stub stub = directChannel.getLocalStub(); - memberToStubMap.put(address, stub); - stubToMemberMap.put(stub, address); } this.hasJoined = true; @@ -905,17 +874,15 @@ public class GMSMembershipManager implements MembershipManager, Manager /** * Process a surprise connect event, or place it on the startup queue. * @param member the member - * @param stub its stub */ protected void handleOrDeferSurpriseConnect(InternalDistributedMember member) { - Stub stub = new Stub(member.getInetAddress(), member.getDirectChannelPort(), member.getVmViewId()); synchronized (startupLock) { if (!processingEvents) { - startupMessages.add(new StartupEvent(member, stub)); + startupMessages.add(new StartupEvent(member)); return; } } - processSurpriseConnect(member, stub); + processSurpriseConnect(member); } public void startupMessageFailed(DistributedMember mbr, String failureMessage) { @@ -941,12 +908,9 @@ public class GMSMembershipManager implements MembershipManager, Manager * been added, simply returns; else adds the member. * * @param dm the member joining - * @param stub the member's stub */ - public boolean addSurpriseMember(DistributedMember dm, - Stub stub) { + public boolean addSurpriseMember(DistributedMember dm) { final InternalDistributedMember member = (InternalDistributedMember)dm; - Stub s = null; boolean warn = false; latestViewLock.writeLock().lock(); @@ -1009,16 +973,6 @@ public class GMSMembershipManager implements MembershipManager, Manager startCleanupTimer(); } // cleanupTimer == null - // fix for bug #42006, lingering old identity - Object oldStub = this.memberToStubMap.remove(member); - if (oldStub != null) { - this.stubToMemberMap.remove(oldStub); - } - - s = stub == null ? getStubForMember(member) : stub; - // Make sure that channel information is consistent - addChannel(member, s); - // Ensure that the member is accounted for in the view // Conjure up a new view including the new member. This is necessary // because we are about to tell the listener about a new member, so @@ -1154,7 +1108,7 @@ public class GMSMembershipManager implements MembershipManager, Manager // If it's a new sender, wait our turn, generate the event if (isNew) { - shunned = !addSurpriseMember(m, getStubForMember(m)); + shunned = !addSurpriseMember(m); } // isNew } @@ -1166,7 +1120,7 @@ public class GMSMembershipManager implements MembershipManager, Manager if (shunned) { // bug #41538 - shun notification must be outside synchronization to avoid hanging warnShun(m); logger.info("Membership: Ignoring message from shunned member <{}>:{}", m, msg); - throw new MemberShunnedException(getStubForMember(m)); + throw new MemberShunnedException(m); } listener.messageReceived(msg); @@ -1248,13 +1202,11 @@ public class GMSMembershipManager implements MembershipManager, Manager * grabbed a stable view if this is really a new member. * * @param member - * @param stub */ private void processSurpriseConnect( - InternalDistributedMember member, - Stub stub) + InternalDistributedMember member) { - addSurpriseMember(member, stub); + addSurpriseMember(member); } /** @@ -1276,7 +1228,7 @@ public class GMSMembershipManager implements MembershipManager, Manager processView(o.gmsView.getViewId(), o.gmsView); } else if (o.isSurpriseConnect()) { // connect - processSurpriseConnect(o.member, o.stub); + processSurpriseConnect(o.member); } else // sanity @@ -1450,7 +1402,7 @@ public class GMSMembershipManager implements MembershipManager, Manager } } - public boolean memberExists(InternalDistributedMember m) { + public boolean memberExists(DistributedMember m) { latestViewLock.readLock().lock(); NetView v = latestView; latestViewLock.readLock().unlock(); @@ -1525,12 +1477,6 @@ public class GMSMembershipManager implements MembershipManager, Manager directChannel.emergencyClose(); } - // could we guarantee not to allocate objects? We're using Darrel's - // factory, so it's possible that an unsafe implementation could be - // introduced here. -// stubToMemberMap.clear(); -// memberToStubMap.clear(); - if (DEBUG) { System.err.println("DEBUG: done closing GroupMembershipService"); } @@ -1767,7 +1713,7 @@ public class GMSMembershipManager implements MembershipManager, Manager allDestinations = true; latestViewLock.writeLock().lock(); try { - Set keySet = memberToStubMap.keySet(); + List<InternalDistributedMember> keySet = latestView.getMembers(); keys = new InternalDistributedMember[keySet.size()]; keys = (InternalDistributedMember[])keySet.toArray(keys); } finally { @@ -2020,80 +1966,6 @@ public class GMSMembershipManager implements MembershipManager, Manager // not currently supported by this manager } - /** - * Get or create stub for given member - */ - public Stub getStubForMember(InternalDistributedMember m) - { - if (shutdownInProgress) { - throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), services.getShutdownCause()); - } - - if (services.getConfig().getDistributionConfig().getDisableTcp()) { - return new Stub(m.getInetAddress(), m.getPort(), m.getVmViewId()); - } - - // Return existing one if it is already in place - Stub result; - result = (Stub)memberToStubMap.get(m); - if (result != null) - return result; - - latestViewLock.writeLock().lock(); - try { - // Do all of this work in a critical region to prevent - // members from slipping in during shutdown - if (shutdownInProgress()) - return null; // don't try to create a stub during shutdown - if (isShunned(m)) - return null; // don't let zombies come back to life - - // OK, create one. Update the table to reflect the creation. - result = directChannel.createConduitStub(m); - addChannel(m, result); - } finally { - latestViewLock.writeLock().unlock(); - } - return result; - } - - public InternalDistributedMember getMemberForStub(Stub s, boolean validated) - { - latestViewLock.writeLock().lock(); - try { - if (shutdownInProgress) { - throw new DistributedSystemDisconnectedException(LocalizedStrings.GroupMembershipService_DISTRIBUTEDSYSTEM_IS_SHUTTING_DOWN.toLocalizedString(), services.getShutdownCause()); - } - InternalDistributedMember result = (InternalDistributedMember) - stubToMemberMap.get(s); - if (result != null) { - if (validated && !this.latestView.contains(result)) { - // Do not return this member unless it is in the current view. - if (!surpriseMembers.containsKey(result)) { - // if not a surprise member, this stub is lingering and should be removed - stubToMemberMap.remove(s); - memberToStubMap.remove(result); - } - result = null; - // fall through to see if there is a newer member using the same direct port - } - } - if (result == null) { - // it may have not been added to the stub->idm map yet, so check the current view - for (InternalDistributedMember idm: latestView.getMembers()) { - if (GMSUtil.compareAddresses(idm.getInetAddress(), s.getInetAddress()) == 0 - && idm.getDirectChannelPort() == s.getPort()) { - addChannel(idm, s); - return idm; - } - } - } - return result; - } finally { - latestViewLock.writeLock().unlock(); - } - } - public void setShutdown() { latestViewLock.writeLock().lock(); @@ -2109,24 +1981,6 @@ public class GMSMembershipManager implements MembershipManager, Manager return shutdownInProgress || (dm != null && dm.shutdownInProgress()); } - /** - * Add a mapping from the given member to the given stub. Must - * be called with {@link #latestViewLock} held. - * - * @param member - * @param theChannel - */ - protected void addChannel(InternalDistributedMember member, Stub theChannel) - { - if (theChannel != null) { - // Don't overwrite existing stub information with a null - this.memberToStubMap.put(member, theChannel); - - // Can't create reverse mapping if the stub is null - this.stubToMemberMap.put(theChannel, member); - } - } - /** * Clean up and create consistent new view with member removed. @@ -2137,12 +1991,6 @@ public class GMSMembershipManager implements MembershipManager, Manager protected void destroyMember(final InternalDistributedMember member, boolean crashed, final String reason) { - // Clean up the maps - Stub theChannel = (Stub)memberToStubMap.remove(member); - if (theChannel != null) { - this.stubToMemberMap.remove(theChannel); - } - // Make sure it is removed from the view latestViewLock.writeLock().lock(); try { @@ -2365,12 +2213,11 @@ public class GMSMembershipManager implements MembershipManager, Manager /* non-thread-owned serial channels and high priority channels are not * included */ - public Map getMessageState(DistributedMember member, boolean includeMulticast) { + public Map getChannelStates(DistributedMember member, boolean includeMulticast) { Map result = new HashMap(); - Stub stub = (Stub)memberToStubMap.get(member); DirectChannel dc = directChannel; - if (stub != null && dc != null) { - dc.getChannelStates(stub, result); + if (dc != null) { + dc.getChannelStates(member, result); } services.getMessenger().getMessageState((InternalDistributedMember)member, result, includeMulticast); return result; @@ -2381,15 +2228,8 @@ public class GMSMembershipManager implements MembershipManager, Manager { if (Thread.interrupted()) throw new InterruptedException(); DirectChannel dc = directChannel; - Stub stub; - latestViewLock.writeLock().lock(); - try { - stub = (Stub)memberToStubMap.get(otherMember); - } finally { - latestViewLock.writeLock().unlock(); - } - if (dc != null && stub != null) { - dc.waitForChannelState(stub, state); + if (dc != null) { + dc.waitForChannelState(otherMember, channelState); } services.getMessenger().waitForMessageState((InternalDistributedMember)otherMember, state); } @@ -2405,7 +2245,6 @@ public class GMSMembershipManager implements MembershipManager, Manager boolean result = false; DirectChannel dc = directChannel; InternalDistributedMember idm = (InternalDistributedMember)mbr; - Stub stub = new Stub(idm.getInetAddress(), idm.getPort(), idm.getVmViewId()); int memberTimeout = this.services.getConfig().getDistributionConfig().getMemberTimeout(); long pauseTime = (memberTimeout < 1000) ? 100 : memberTimeout / 10; boolean wait; @@ -2413,7 +2252,7 @@ public class GMSMembershipManager implements MembershipManager, Manager do { wait = false; if (dc != null) { - if (dc.hasReceiversFor(stub)) { + if (dc.hasReceiversFor(idm)) { wait = true; } if (wait && logger.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java index 7bb97b9..780fe18 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java @@ -1109,7 +1109,7 @@ class ParentLocalizedStrings { public static final StringId TCPConduit_ENDING_RECONNECT_ATTEMPT_BECAUSE_0_HAS_DISAPPEARED = new StringId(2086, "Ending reconnect attempt because {0} has disappeared."); public static final StringId TCPConduit_ENDING_RECONNECT_ATTEMPT_TO_0_BECAUSE_SHUTDOWN_HAS_STARTED = new StringId(2087, "Ending reconnect attempt to {0} because shutdown has started."); public static final StringId TCPConduit_ERROR_SENDING_MESSAGE_TO_0_WILL_REATTEMPT_1 = new StringId(2088, "Error sending message to {0} (will reattempt): {1}"); - public static final StringId TCPConduit_EXCEPTION_CREATING_SERVERSOCKET = new StringId(2089, "While creating ServerSocket and Stub on port {0} with address {1}"); + public static final StringId TCPConduit_EXCEPTION_CREATING_SERVERSOCKET = new StringId(2089, "While creating ServerSocket on port {0} with address {1}"); public static final StringId TCPConduit_EXCEPTION_PARSING_P2PIDLECONNECTIONTIMEOUT = new StringId(2090, "exception parsing p2p.idleConnectionTimeout"); public static final StringId TCPConduit_EXCEPTION_PARSING_P2PTCPBUFFERSIZE = new StringId(2091, "exception parsing p2p.tcpBufferSize"); public static final StringId TCPConduit_FAILED_TO_ACCEPT_CONNECTION_FROM_0_BECAUSE_1 = new StringId(2092, "Failed to accept connection from {0} because {1}"); @@ -1444,7 +1444,7 @@ class ParentLocalizedStrings { public static final StringId Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1 = new StringId(2432, "Detected wrong version of GemFire product during handshake. Expected {0} but found {1}"); public static final StringId Connection_FORCED_DISCONNECT_SENT_TO_0 = new StringId(2433, "Forced disconnect sent to {0}"); public static final StringId Connection_HANDSHAKE_FAILED = new StringId(2434, "Handshake failed"); - public static final StringId Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP = new StringId(2435, "Member for stub {0} left the group"); + public static final StringId Connection_MEMBER_LEFT_THE_GROUP = new StringId(2435, "Member {0} left the group"); public static final StringId Connection_NOT_CONNECTED_TO_0 = new StringId(2436, "Not connected to {0}"); public static final StringId Connection_NULL_CONNECTIONTABLE = new StringId(2437, "Null ConnectionTable"); public static final StringId Connection_SOCKET_HAS_BEEN_CLOSED = new StringId(2438, "socket has been closed"); @@ -1542,7 +1542,7 @@ class ParentLocalizedStrings { public static final StringId DefaultQuery_WHEN_QUERYING_A_PARTITIONED_REGION_THE_PROJECTIONS_MUST_NOT_REFERENCE_ANY_REGIONS = new StringId(2530, "When querying a Partitioned Region, the projections must not reference any regions"); public static final StringId DestroyMessage_FAILED_SENDING_0 = new StringId(2531, "Failed sending < {0} >"); public static final StringId DirectChannel_COMMUNICATIONS_DISCONNECTED = new StringId(2532, "communications disconnected"); - public static final StringId DirectChannel_NO_STUB_0 = new StringId(2533, "No stub {0}"); + public static final StringId DirectChannel_SHUNNING_0 = new StringId(2533, "Member is being shunned: {0}"); public static final StringId DirectChannel_UNKNOWN_ERROR_SERIALIZING_MESSAGE = new StringId(2534, "Unknown error serializing message"); public static final StringId DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING = new StringId(2535, "An IOException was thrown while serializing."); public static final StringId DiskEntry_DISK_REGION_IS_NULL = new StringId(2536, "Disk region is null"); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java index f918812..74660da 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Connection.java @@ -50,6 +50,7 @@ import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.CancelException; import com.gemstone.gemfire.SystemFailure; import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.internal.ConflationKey; import com.gemstone.gemfire.distributed.internal.DM; @@ -72,7 +73,6 @@ import com.gemstone.gemfire.internal.Assert; import com.gemstone.gemfire.internal.ByteArrayDataInput; import com.gemstone.gemfire.internal.DSFIDFactory; import com.gemstone.gemfire.internal.InternalDataSerializer; -import com.gemstone.gemfire.internal.SocketCloser; import com.gemstone.gemfire.internal.SocketCreator; import com.gemstone.gemfire.internal.SocketUtils; import com.gemstone.gemfire.internal.SystemTimer; @@ -222,11 +222,6 @@ public class Connection implements Runnable { /** the ID string of the conduit (for logging) */ String conduitIdStr; - /** remoteId identifies the remote conduit's listener. It does NOT - identify the "port" that this connection's socket is attached - to, which is a different thing altogether */ - Stub remoteId; - /** Identifies the java group member on the other side of the connection. */ InternalDistributedMember remoteAddr; @@ -801,7 +796,7 @@ public class Connection implements Runnable { } if (success) { if (this.isReceiver) { - needToClose = !owner.getConduit().getMembershipManager().addSurpriseMember(this.remoteAddr, this.remoteId); + needToClose = !owner.getConduit().getMembershipManager().addSurpriseMember(this.remoteAddr); if (needToClose) { reason = "this member is shunned"; } @@ -845,7 +840,7 @@ public class Connection implements Runnable { * @param beingSick */ private void asyncClose(boolean beingSick) { - // note: remoteId may be null if this is a receiver that hasn't finished its handshake + // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake // we do the close in a background thread because the operation may hang if // there is a problem with the network. See bug #46659 @@ -1018,8 +1013,7 @@ public class Connection implements Runnable { protected static Connection createSender(final MembershipManager mgr, final ConnectionTable t, final boolean preserveOrder, - final Stub key, - final InternalDistributedMember remoteAddr, + final DistributedMember remoteAddr, final boolean sharedResource, final long startTime, final long ackTimeout, @@ -1074,9 +1068,8 @@ public class Connection implements Runnable { } if (firstTime) { firstTime = false; - InternalDistributedMember m = mgr.getMemberForStub(key, true); - if (m == null) { - throw new IOException("Member for stub " + key + " left the group"); + if (!mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress()) { + throw new IOException("Member " + remoteAddr + " left the system"); } } else { @@ -1084,7 +1077,7 @@ public class Connection implements Runnable { // alert listener should not prevent cache operations from continuing if (AlertAppender.isThreadAlerting()) { // do not change the text of this exception - it is looked for in exception handlers - throw new IOException("Cannot form connection to alert listener " + key); + throw new IOException("Cannot form connection to alert listener " + remoteAddr); } // Wait briefly... @@ -1097,20 +1090,19 @@ public class Connection implements Runnable { t.getConduit().getCancelCriterion().checkCancelInProgress(ie); } t.getConduit().getCancelCriterion().checkCancelInProgress(null); - InternalDistributedMember m = mgr.getMemberForStub(key, true); - if (m == null) { - throw new IOException(LocalizedStrings.Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP.toLocalizedString(key)); + if (giveUpOnMember(mgr, remoteAddr)) { + throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr)); } if (!warningPrinted) { warningPrinted = true; - logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, m)); + logger.warn(LocalizedMessage.create(LocalizedStrings.Connection_CONNECTION_ATTEMPTING_RECONNECT_TO_PEER__0, remoteAddr)); } t.getConduit().stats.incReconnectAttempts(); } //create connection try { conn = null; - conn = new Connection(mgr, t, preserveOrder, key, remoteAddr, sharedResource); + conn = new Connection(mgr, t, preserveOrder, remoteAddr, sharedResource); } catch (javax.net.ssl.SSLHandshakeException se) { // no need to retry if certificates were rejected @@ -1118,8 +1110,7 @@ public class Connection implements Runnable { } catch (IOException ioe) { // Only give up if the member leaves the view. - InternalDistributedMember m = mgr.getMemberForStub(key, true); - if (m == null) { + if (giveUpOnMember(mgr, remoteAddr)) { throw ioe; } t.getConduit().getCancelCriterion().checkCancelInProgress(null); @@ -1130,7 +1121,7 @@ public class Connection implements Runnable { connectionErrorLogged = true; // otherwise change to use 100ms intervals causes a lot of these logger.info(LocalizedMessage.create( LocalizedStrings.Connection_CONNECTION_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1, - new Object[] {sharedResource, preserveOrder, m, ioe})); + new Object[] {sharedResource, preserveOrder, remoteAddr, ioe})); } } // IOException finally { @@ -1146,9 +1137,8 @@ public class Connection implements Runnable { // something went wrong while reading the handshake // and the socket was closed or this guy sent us a // ShutdownMessage - InternalDistributedMember m = mgr.getMemberForStub(key, true); - if (m == null) { - throw new IOException(LocalizedStrings.Connection_MEMBER_FOR_STUB_0_LEFT_THE_GROUP.toLocalizedString(key)); + if (giveUpOnMember(mgr, remoteAddr)) { + throw new IOException(LocalizedStrings.Connection_MEMBER_LEFT_THE_GROUP.toLocalizedString(remoteAddr)); } t.getConduit().getCancelCriterion().checkCancelInProgress(null); // no success but no need to log; just retry @@ -1161,8 +1151,7 @@ public class Connection implements Runnable { throw e; } catch (ConnectionException e) { - InternalDistributedMember m = mgr.getMemberForStub(key, true); - if (m == null) { + if (giveUpOnMember(mgr, remoteAddr)) { IOException ioe = new IOException(LocalizedStrings.Connection_HANDSHAKE_FAILED.toLocalizedString()); ioe.initCause(e); throw ioe; @@ -1170,17 +1159,16 @@ public class Connection implements Runnable { t.getConduit().getCancelCriterion().checkCancelInProgress(null); logger.info(LocalizedMessage.create( LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1, - new Object[] {sharedResource, preserveOrder, m,e})); + new Object[] {sharedResource, preserveOrder, remoteAddr ,e})); } catch (IOException e) { - InternalDistributedMember m = mgr.getMemberForStub(key, true); - if (m == null) { + if (giveUpOnMember(mgr, remoteAddr)) { throw e; } t.getConduit().getCancelCriterion().checkCancelInProgress(null); logger.info(LocalizedMessage.create( LocalizedStrings.Connection_CONNECTION_HANDSHAKE_FAILED_TO_CONNECT_TO_PEER_0_BECAUSE_1, - new Object[] {sharedResource, preserveOrder, m,e})); + new Object[] {sharedResource, preserveOrder, remoteAddr ,e})); if (!sharedResource && "Too many open files".equals(e.getMessage())) { t.fileDescriptorsExhausted(); } @@ -1220,7 +1208,7 @@ public class Connection implements Runnable { if (conn == null) { throw new ConnectionException( LocalizedStrings.Connection_CONNECTION_FAILED_CONSTRUCTION_FOR_PEER_0 - .toLocalizedString(mgr.getMemberForStub(key, true))); + .toLocalizedString(remoteAddr)); } if (preserveOrder && BATCH_SENDS) { conn.createBatchSendBuffer(); @@ -1228,12 +1216,15 @@ public class Connection implements Runnable { conn.finishedConnecting = true; return conn; } + + private static boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) { + return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress(); + } - private void setRemoteAddr(InternalDistributedMember m, Stub stub) { + private void setRemoteAddr(DistributedMember m) { this.remoteAddr = this.owner.getDM().getCanonicalId(m); - this.remoteId = stub; MembershipManager mgr = this.owner.owner.getMembershipManager(); - mgr.addSurpriseMember(m, stub); + mgr.addSurpriseMember(m); } /** creates a new connection to a remote server. @@ -1243,11 +1234,11 @@ public class Connection implements Runnable { private Connection(MembershipManager mgr, ConnectionTable t, boolean preserveOrder, - Stub key, - InternalDistributedMember remoteAddr, + DistributedMember remoteID, boolean sharedResource) throws IOException, DistributedSystemDisconnectedException { + InternalDistributedMember remoteAddr = (InternalDistributedMember)remoteID; if (t == null) { throw new IllegalArgumentException(LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString()); } @@ -1255,7 +1246,7 @@ public class Connection implements Runnable { this.owner = t; this.sharedResource = sharedResource; this.preserveOrder = preserveOrder; - setRemoteAddr(remoteAddr, key); + setRemoteAddr(remoteAddr); this.conduitIdStr = this.owner.getConduit().getId().toString(); this.handshakeRead = false; this.handshakeCancelled = false; @@ -1265,7 +1256,7 @@ public class Connection implements Runnable { // connect to listening socket - InetSocketAddress addr = new InetSocketAddress(remoteId.getInetAddress(), remoteId.getPort()); + InetSocketAddress addr = new InetSocketAddress(remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort()); if (useNIO()) { SocketChannel channel = SocketChannel.open(); this.owner.addConnectingSocket(channel.socket(), addr.getAddress()); @@ -1325,15 +1316,15 @@ public class Connection implements Runnable { else { if (TCPConduit.useSSL) { // socket = javax.net.ssl.SSLSocketFactory.getDefault() - // .createSocket(remoteId.getInetAddress(), remoteId.getPort()); + // .createSocket(remoteAddr.getInetAddress(), remoteAddr.getPort()); int socketBufferSize = sharedResource ? SMALL_BUFFER_SIZE : this.owner.getConduit().tcpBufferSize; - this.socket = SocketCreator.getDefaultInstance().connectForServer( remoteId.getInetAddress(), remoteId.getPort(), socketBufferSize ); + this.socket = SocketCreator.getDefaultInstance().connectForServer( remoteAddr.getInetAddress(), remoteAddr.getDirectChannelPort(), socketBufferSize ); // Set the receive buffer size local fields. It has already been set in the socket. setSocketBufferSize(this.socket, false, socketBufferSize, true); setSendBufferSize(this.socket); } else { - //socket = new Socket(remoteId.getInetAddress(), remoteId.getPort()); + //socket = new Socket(remoteAddr.getInetAddress(), remoteAddr.getPort()); Socket s = new Socket(); this.socket = s; s.setTcpNoDelay(true); @@ -1639,8 +1630,8 @@ public class Connection implements Runnable { // we can't wait for the reader thread when running in an IBM JRE. See // bug 41889 if (this.owner.owner.config.getEnableNetworkPartitionDetection() || - this.owner.owner.getLocalId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE || - this.owner.owner.getLocalId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { + this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE || + this.owner.owner.getLocalAddr().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor")); } { @@ -1689,16 +1680,16 @@ public class Connection implements Runnable { // Only remove endpoint if sender. if (this.finishedConnecting) { // only remove endpoint if our constructor finished - this.owner.removeEndpoint(this.remoteId, reason); + this.owner.removeEndpoint(this.remoteAddr, reason); } } } else { - this.owner.removeSharedConnection(reason, this.remoteId, this.preserveOrder, this); + this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this); } } else if (!this.isReceiver) { - this.owner.removeThreadConnection(this.remoteId, this); + this.owner.removeThreadConnection(this.remoteAddr, this); } } else { @@ -1706,10 +1697,10 @@ public class Connection implements Runnable { // has never added this Connection to its maps since // the calls in this block use our identity to do the removes. if (this.sharedResource) { - this.owner.removeSharedConnection(reason, this.remoteId, this.preserveOrder, this); + this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this); } else if (!this.isReceiver) { - this.owner.removeThreadConnection(this.remoteId, this); + this.owner.removeThreadConnection(this.remoteAddr, this); } } } @@ -1753,7 +1744,7 @@ public class Connection implements Runnable { } finally { // bug36060: do the socket close within a finally block if (logger.isDebugEnabled()) { - logger.debug("Stopping {} for {}", p2pReaderName(), remoteId); + logger.debug("Stopping {} for {}", p2pReaderName(), remoteAddr); } initiateSuspicionIfSharedUnordered(); if (this.isReceiver) { @@ -2338,8 +2329,7 @@ public class Connection implements Runnable { .toLocalizedString(new Object[]{new Byte(HANDSHAKE_VERSION), new Byte(handShakeByte)})); } InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis); - Stub stub = new Stub(remote.getInetAddress()/*fix for bug 33615*/, remote.getDirectChannelPort(), remote.getVmViewId()); - setRemoteAddr(remote, stub); + setRemoteAddr(remote); Thread.currentThread().setName(LocalizedStrings.Connection_P2P_MESSAGE_READER_FOR_0.toLocalizedString(this.remoteAddr, this.socket.getPort())); this.sharedResource = dis.readBoolean(); this.preserveOrder = dis.readBoolean(); @@ -2377,7 +2367,7 @@ public class Connection implements Runnable { } if (logger.isDebugEnabled()) { - logger.debug("{} remoteId is {} {}", p2pReaderName(), this.remoteId, + logger.debug("{} remoteAddr is {} {}", p2pReaderName(), this.remoteAddr, (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : "")); } @@ -2555,7 +2545,7 @@ public class Connection implements Runnable { throws IOException, ConnectionException { if (!connected) { - throw new ConnectionException(LocalizedStrings.Connection_NOT_CONNECTED_TO_0.toLocalizedString(this.remoteId)); + throw new ConnectionException(LocalizedStrings.Connection_NOT_CONNECTED_TO_0.toLocalizedString(this.remoteAddr)); } if (this.batchFlusher != null) { batchSend(buffer); @@ -2778,7 +2768,7 @@ public class Connection implements Runnable { if (this.disconnectRequested) { buffer.position(origBufferPos); // we have given up so just drop this message. - throw new ConnectionException(LocalizedStrings.Connection_FORCED_DISCONNECT_SENT_TO_0.toLocalizedString(this.remoteId)); + throw new ConnectionException(LocalizedStrings.Connection_FORCED_DISCONNECT_SENT_TO_0.toLocalizedString(this.remoteAddr)); } if (!force && !this.asyncQueuingInProgress) { // reset buffer since we will be sending it. This fixes bug 34832 @@ -2980,7 +2970,7 @@ public class Connection implements Runnable { } DM dm = this.owner.getDM(); if (dm == null) { - this.owner.removeEndpoint(this.remoteId, LocalizedStrings.Connection_NO_DISTRIBUTION_MANAGER.toLocalizedString()); + this.owner.removeEndpoint(this.remoteAddr, LocalizedStrings.Connection_NO_DISTRIBUTION_MANAGER.toLocalizedString()); return; } dm.getMembershipManager().requestMemberRemoval(this.remoteAddr, @@ -3001,7 +2991,7 @@ public class Connection implements Runnable { return; } } - this.owner.removeEndpoint(this.remoteId, + this.owner.removeEndpoint(this.remoteAddr, LocalizedStrings.Connection_FORCE_DISCONNECT_TIMED_OUT.toLocalizedString()); if (dm.getOtherDistributionManagerIds().contains(this.remoteAddr)) { if (logger.isDebugEnabled()) { @@ -3110,7 +3100,7 @@ public class Connection implements Runnable { stats.incAsyncThreads(-1); stats.incAsyncQueues(-1); if (logger.isDebugEnabled()) { - logger.debug("runNioPusher terminated id={} from {}/{}", conduitIdStr, remoteId, remoteAddr); + logger.debug("runNioPusher terminated id={} from {}/{}", conduitIdStr, remoteAddr, remoteAddr); } } } finally { @@ -3837,8 +3827,7 @@ public class Connection implements Runnable { throw new IllegalStateException(LocalizedStrings.Connection_DETECTED_WRONG_VERSION_OF_GEMFIRE_PRODUCT_DURING_HANDSHAKE_EXPECTED_0_BUT_FOUND_1.toLocalizedString(new Object[] {new Byte(HANDSHAKE_VERSION), new Byte(handShakeByte)})); } InternalDistributedMember remote = DSFIDFactory.readInternalDistributedMember(dis); - Stub stub = new Stub(remote.getInetAddress()/*fix for bug 33615*/, remote.getDirectChannelPort(), remote.getVmViewId()); - setRemoteAddr(remote, stub); + setRemoteAddr(remote); this.sharedResource = dis.readBoolean(); this.preserveOrder = dis.readBoolean(); this.uniqueId = dis.readLong(); @@ -3897,7 +3886,7 @@ public class Connection implements Runnable { return; } if (logger.isDebugEnabled()) { - logger.debug("P2P handshake remoteId is {}{}", this.remoteId, + logger.debug("P2P handshake remoteAddr is {}{}", this.remoteAddr, (this.remoteVersion != null ? " (" + this.remoteVersion + ')' : "")); } try { @@ -4031,12 +4020,6 @@ public class Connection implements Runnable { this.accessed = true; } - /** returns the ConnectionKey stub representing the other side of - this connection (host:port) */ - public final Stub getRemoteId() { - return remoteId; - } - /** return the DM id of the guy on the other side of this connection. */ public final InternalDistributedMember getRemoteAddress() { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java index bac356c..3816efe 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ConnectionTable.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.SystemFailure; +import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; @@ -60,7 +61,7 @@ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; /** <p>ConnectionTable holds all of the Connection objects in a conduit. Connections represent a pipe between two endpoints represented - by generic Stubs.</p> + by generic DistributedMembers.</p> @author Bruce Schuchardt @author Darrel Schneider @@ -345,7 +346,7 @@ public class ConnectionTable { /** * Process a newly created PendingConnection * - * @param id Stub on which the connection is created + * @param id DistributedMember on which the connection is created * @param sharedResource whether the connection is used by multiple threads * @param preserveOrder whether to preserve order * @param m map to add the connection to @@ -357,7 +358,7 @@ public class ConnectionTable { * @throws IOException if unable to connect * @throws DistributedSystemDisconnectedException */ - private Connection handleNewPendingConnection(Stub id, boolean sharedResource, + private Connection handleNewPendingConnection(DistributedMember id, boolean sharedResource, boolean preserveOrder, Map m, PendingConnection pc, long startTime, long ackThreshold, long ackSAThreshold) throws IOException, DistributedSystemDisconnectedException @@ -366,7 +367,7 @@ public class ConnectionTable { Connection con = null; try { con = Connection.createSender(owner.getMembershipManager(), this, preserveOrder, - id, this.owner.getMemberForStub(id, false), + id, sharedResource, startTime, ackThreshold, ackSAThreshold); this.owner.stats.incSenders(sharedResource, preserveOrder); @@ -442,7 +443,7 @@ public class ConnectionTable { * unordered or conserve-sockets * note that unordered connections are currently always shared * - * @param id the Stub on which we are creating a connection + * @param id the DistributedMember on which we are creating a connection * @param threadOwnsResources whether unordered conn is owned by the current thread * @param preserveOrder whether to preserve order * @param startTime the ms clock start time for the operation @@ -452,7 +453,7 @@ public class ConnectionTable { * @throws IOException if unable to create the connection * @throws DistributedSystemDisconnectedException */ - private Connection getUnorderedOrConserveSockets(Stub id, + private Connection getUnorderedOrConserveSockets(DistributedMember id, boolean threadOwnsResources, boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout) throws IOException, DistributedSystemDisconnectedException @@ -527,7 +528,7 @@ public class ConnectionTable { * @throws IOException if the connection could not be created * @throws DistributedSystemDisconnectedException */ - Connection getOrderedAndOwned(Stub id, long startTime, long ackTimeout, long ackSATimeout) + Connection getOrderedAndOwned(DistributedMember id, long startTime, long ackTimeout, long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { Connection result = null; @@ -566,7 +567,7 @@ public class ConnectionTable { // OK, we have to create a new connection. result = Connection.createSender(owner.getMembershipManager(), this, true /* preserveOrder */, id, - this.owner.getMemberForStub(id, false), false /* shared */, + false /* shared */, startTime, ackTimeout, ackSATimeout); if (logger.isDebugEnabled()) { logger.debug("ConnectionTable: created an ordered connection: {}", result); @@ -583,7 +584,7 @@ public class ConnectionTable { ArrayList al = (ArrayList)this.threadConnectionMap.get(id); if (al == null) { - // First connection for this Stub. Make sure list for this + // First connection for this DistributedMember. Make sure list for this // stub is created if it isn't already there. al = new ArrayList(); @@ -651,7 +652,7 @@ public class ConnectionTable { /** * Get a new connection - * @param id the Stub on which to create the connection + * @param id the DistributedMember on which to create the connection * @param preserveOrder whether order should be preserved * @param startTime the ms clock start time * @param ackTimeout the ms ack-wait-threshold, or zero @@ -660,7 +661,7 @@ public class ConnectionTable { * @throws java.io.IOException if the connection could not be created * @throws DistributedSystemDisconnectedException */ - protected Connection get(Stub id, boolean preserveOrder, + protected Connection get(DistributedMember id, boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout) throws java.io.IOException, DistributedSystemDisconnectedException { @@ -838,34 +839,38 @@ public class ConnectionTable { /** * Return true if our owner already knows that this endpoint is departing */ - protected boolean isEndpointShuttingDown(Stub stub) { - return this.owner.getMemberForStub(stub, true) == null; + protected boolean isEndpointShuttingDown(DistributedMember id) { + return giveUpOnMember(owner.getDM().getMembershipManager(), id); } + protected boolean giveUpOnMember(MembershipManager mgr, DistributedMember remoteAddr) { + return !mgr.memberExists(remoteAddr) || mgr.isShunned(remoteAddr) || mgr.shutdownInProgress(); + } + /** remove an endpoint and notify the membership manager of the departure */ - protected void removeEndpoint(Stub stub, String reason) { + protected void removeEndpoint(DistributedMember stub, String reason) { removeEndpoint(stub, reason, true); } - protected void removeEndpoint(Stub stub, String reason, boolean notifyDisconnect) { + protected void removeEndpoint(DistributedMember memberID, String reason, boolean notifyDisconnect) { if (this.closed) { return; } boolean needsRemoval = false; synchronized (this.orderedConnectionMap) { - if (this.orderedConnectionMap.get(stub) != null) + if (this.orderedConnectionMap.get(memberID) != null) needsRemoval = true; } if (!needsRemoval) { synchronized (this.unorderedConnectionMap) { - if (this.unorderedConnectionMap.get(stub) != null) + if (this.unorderedConnectionMap.get(memberID) != null) needsRemoval = true; } } if (!needsRemoval) { ConcurrentMap cm = this.threadConnectionMap; if (cm != null) { - ArrayList al = (ArrayList)cm.get(stub); + ArrayList al = (ArrayList)cm.get(memberID); needsRemoval = al != null && al.size() > 0; } } @@ -873,14 +878,14 @@ public class ConnectionTable { if (needsRemoval) { InternalDistributedMember remoteAddress = null; synchronized (this.orderedConnectionMap) { - Object c = this.orderedConnectionMap.remove(stub); + Object c = this.orderedConnectionMap.remove(memberID); if (c instanceof Connection) { remoteAddress = ((Connection) c).getRemoteAddress(); } closeCon(reason, c); } synchronized (this.unorderedConnectionMap) { - Object c = this.unorderedConnectionMap.remove(stub); + Object c = this.unorderedConnectionMap.remove(memberID); if (remoteAddress == null && (c instanceof Connection)) { remoteAddress = ((Connection) c).getRemoteAddress(); } @@ -890,7 +895,7 @@ public class ConnectionTable { { ConcurrentMap cm = this.threadConnectionMap; if (cm != null) { - ArrayList al = (ArrayList)cm.remove(stub); + ArrayList al = (ArrayList)cm.remove(memberID); if (al != null) { synchronized (al) { for (Iterator it=al.iterator(); it.hasNext();) { @@ -912,7 +917,7 @@ public class ConnectionTable { for (Iterator it=connectingSockets.entrySet().iterator(); it.hasNext(); ) { Map.Entry entry = (Map.Entry)it.next(); ConnectingSocketInfo info = (ConnectingSocketInfo)entry.getValue(); - if (info.peerAddress.equals(stub.getInetAddress())) { + if (info.peerAddress.equals(((InternalDistributedMember)memberID).getInetAddress())) { toRemove.add(entry.getKey()); it.remove(); } @@ -925,7 +930,7 @@ public class ConnectionTable { } catch (IOException e) { if (logger.isDebugEnabled()) { - logger.debug("caught exception while trying to close connecting socket for {}", stub, e); + logger.debug("caught exception while trying to close connecting socket for {}", memberID, e); } } } @@ -937,7 +942,7 @@ public class ConnectionTable { synchronized (this.receivers) { for (Iterator it=receivers.iterator(); it.hasNext();) { Connection con = (Connection)it.next(); - if (stub.equals(con.getRemoteId())) { + if (memberID.equals(con.getRemoteAddress())) { it.remove(); toRemove.add(con); } @@ -947,10 +952,13 @@ public class ConnectionTable { Connection con = (Connection)it.next(); closeCon(reason, con); } - // call memberDeparted after doing the closeCon calls - // so it can recursively call removeEndpoint if (notifyDisconnect) { - owner.getMemberForStub(stub, false); + // Before the removal of TCPConduit Stub addresses this used + // to call MembershipManager.getMemberForStub, which checked + // for a shutdown in progress and threw this exception: + if (owner.getDM().shutdownInProgress()) { + throw new DistributedSystemDisconnectedException("Shutdown in progress", owner.getDM().getMembershipManager().getShutdownCause()); + } } if (remoteAddress != null) { @@ -964,11 +972,11 @@ public class ConnectionTable { } /** check to see if there are still any receiver threads for the given end-point */ - protected boolean hasReceiversFor(Stub endPoint) { + protected boolean hasReceiversFor(DistributedMember endPoint) { synchronized (this.receivers) { for (Iterator it=receivers.iterator(); it.hasNext();) { Connection con = (Connection)it.next(); - if (endPoint.equals(con.getRemoteId())) { + if (endPoint.equals(con.getRemoteAddress())) { return true; } } @@ -976,7 +984,7 @@ public class ConnectionTable { return false; } - private static void removeFromThreadConMap(ConcurrentMap cm, Stub stub, Connection c) { + private static void removeFromThreadConMap(ConcurrentMap cm, DistributedMember stub, Connection c) { if (cm != null) { ArrayList al = (ArrayList)cm.get(stub); if (al != null) { @@ -986,7 +994,7 @@ public class ConnectionTable { } } } - protected void removeThreadConnection(Stub stub, Connection c) { + protected void removeThreadConnection(DistributedMember stub, Connection c) { /*if (this.closed) { return; }*/ @@ -1001,7 +1009,7 @@ public class ConnectionTable { } // synchronized } // m != null } - void removeSharedConnection(String reason, Stub stub, boolean ordered, Connection c) { + void removeSharedConnection(String reason, DistributedMember stub, boolean ordered, Connection c) { if (this.closed) { return; } @@ -1054,7 +1062,7 @@ public class ConnectionTable { Iterator it = m.entrySet().iterator(); while (it.hasNext()) { Map.Entry me = (Map.Entry)it.next(); - Stub stub = (Stub)me.getKey(); + DistributedMember stub = (DistributedMember)me.getKey(); Connection c = (Connection)me.getValue(); removeFromThreadConMap(this.threadConnectionMap, stub, c); it.remove(); @@ -1079,7 +1087,7 @@ public class ConnectionTable { * from being formed or new messages from being sent * @since 5.1 */ - protected void getThreadOwnedOrderedConnectionState(Stub member, + protected void getThreadOwnedOrderedConnectionState(DistributedMember member, Map result) { ConcurrentMap cm = this.threadConnectionMap; @@ -1105,7 +1113,7 @@ public class ConnectionTable { * wait for the given incoming connections to receive at least the associated * number of messages */ - protected void waitForThreadOwnedOrderedConnectionState(Stub member, + protected void waitForThreadOwnedOrderedConnectionState(DistributedMember member, Map connectionStates) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // wisest to do this before the synchronize below List r = null; @@ -1115,14 +1123,14 @@ public class ConnectionTable { for (Iterator it=r.iterator(); it.hasNext();) { Connection con = (Connection)it.next(); if (!con.stopped && !con.isClosing() && !con.getOriginatedHere() && con.getPreserveOrder() - && member.equals(con.getRemoteId())) { + && member.equals(con.getRemoteAddress())) { Long state = (Long)connectionStates.remove(Long.valueOf(con.getUniqueId())); if (state != null) { long count = state.longValue(); while (!con.stopped && !con.isClosing() && con.getMessagesReceived() < count) { if (logger.isDebugEnabled()) { logger.debug("Waiting for connection {}/{} currently={} need={}", - con.getRemoteId(), con.getUniqueId(), con.getMessagesReceived(), count); + con.getRemoteAddress(), con.getUniqueId(), con.getMessagesReceived(), count); } Thread.sleep(100); } @@ -1230,11 +1238,11 @@ public class ConnectionTable { /** * the stub we are connecting to */ - private final Stub id; + private final DistributedMember id; private final Thread connectingThread; - public PendingConnection(boolean preserveOrder, Stub id) { + public PendingConnection(boolean preserveOrder, DistributedMember id) { this.preserveOrder = preserveOrder; this.id = id; this.connectingThread = Thread.currentThread(); @@ -1279,10 +1287,9 @@ public class ConnectionTable { boolean severeAlertIssued = false; boolean suspected = false; - InternalDistributedMember targetMember = null; + DistributedMember targetMember = null; if (ackSATimeout > 0) { - targetMember = - ((GMSMembershipManager)mgr).getMemberForStub(this.id, false); + targetMember = this.id; } for (;;) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java index 5cd426f..a954814 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/MemberShunnedException.java @@ -18,6 +18,7 @@ package com.gemstone.gemfire.internal.tcp; import com.gemstone.gemfire.GemFireException; +import com.gemstone.gemfire.distributed.DistributedMember; /** * MemberShunnedException may be thrown to prevent ack-ing a message @@ -28,13 +29,13 @@ import com.gemstone.gemfire.GemFireException; public class MemberShunnedException extends GemFireException { private static final long serialVersionUID = -8453126202477831557L; - private Stub member; + private DistributedMember member; /** * constructor * @param member the member that was shunned */ - public MemberShunnedException(Stub member) { + public MemberShunnedException(DistributedMember member) { super(""); this.member = member; } @@ -42,7 +43,7 @@ public class MemberShunnedException extends GemFireException /** * @return the member that was shunned */ - public Stub getShunnedMember() { + public DistributedMember getShunnedMember() { return this.member; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java index fd495d9..cd711e7 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/ServerDelegate.java @@ -16,6 +16,7 @@ */ package com.gemstone.gemfire.internal.tcp; +import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.internal.DistributionMessage; import com.gemstone.gemfire.distributed.internal.membership.*; import com.gemstone.gemfire.i18n.LogWriterI18n; @@ -34,7 +35,7 @@ import com.gemstone.gemfire.i18n.LogWriterI18n; public interface ServerDelegate { public void receive( DistributionMessage message, int bytesRead, - Stub connId ); + DistributedMember connId ); public LogWriterI18n getLogger(); @@ -42,5 +43,5 @@ public interface ServerDelegate { * Called when a possibly new member is detected by receiving a direct channel * message from him. */ - public void newMemberConnected(InternalDistributedMember member, Stub id); + public void newMemberConnected(InternalDistributedMember member); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4bf4557b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java deleted file mode 100644 index 2e4b91b..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/tcp/Stub.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.tcp; - -import java.io.*; -import java.net.*; - -import com.gemstone.gemfire.DataSerializable; -import com.gemstone.gemfire.DataSerializer; -import com.gemstone.gemfire.internal.InternalDataSerializer; - -/** Stub represents an ip address and port. - - @author Bruce Schuchardt - @since 2.0 - - */ - -public class Stub implements Externalizable, DataSerializable -{ - private InetAddress inAddr; - private int port; - private int viewID; - - public Stub() { - // public default needed for deserialization - } - - public Stub(InetAddress addr, int port, int vmViewID) { - viewID = vmViewID; - inAddr = addr; - this.port = port; - } - - @Override - public boolean equals(Object o) { - if (o == this) { - return true; - } - if (o instanceof Stub) { - Stub s = (Stub)o; - boolean result; - if (inAddr == null) - result = s.inAddr == null; - else - result = inAddr.equals(s.inAddr); - result = result && port == s.port; - if (this.viewID != 0 && s.viewID != 0) { - result = result && (this.viewID == s.viewID); - } - return result; - } - else { - return false; - } - } - - // hashCode equates to the address hashCode for fast connection lookup - @Override - public int hashCode() { - // do not use viewID in hashCode because it is changed after creating a stub - int result = 0; - // result += inAddr.hashCode(); // useless - result += port; - return result; - } - - public void setViewID(int viewID) { - this.viewID = viewID; - } - - public int getPort() { - return port; - } - - public int getViewID() { - return this.viewID; - } - - public InetAddress getInetAddress() { - return inAddr; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(80); - sb.append("tcp://"); - if (inAddr == null) - sb.append("<null>"); - else - sb.append(inAddr.toString()); - if (this.viewID != 0) { - sb.append("<v"+this.viewID+">"); - } - sb.append(":" + port); - return sb.toString(); - } - - /** - * Writes the contents of this <code>Stub</code> to a - * <code>DataOutput</code>. - * - * @since 3.0 - */ - public void toData(DataOutput out) - throws IOException - { - DataSerializer.writeInetAddress(inAddr, out); - out.writeInt(port); - out.writeInt(viewID); - } - - /** - * Reads the contents of this <code>Stub</code> from a - * <code>DataOutput</code>. - * - * @since 3.0 - */ - public void fromData(DataInput in) - throws IOException, ClassNotFoundException - { - inAddr = DataSerializer.readInetAddress(in); - this.port = in.readInt(); - this.viewID = in.readInt(); - } - - /** - * static factory method - * @since 5.0.2 - */ - public static Stub createFromData(DataInput in) - throws IOException, ClassNotFoundException - { - Stub result = new Stub(); - InternalDataSerializer.invokeFromData(result, in); - return result; - } - - public void writeExternal(ObjectOutput os) - throws IOException - { - this.toData(os); - } - - public void readExternal(ObjectInput is) - throws IOException, ClassNotFoundException - { - this.fromData(is); - } -}
