new unit tests and code clean-up New unit tests to increase code coverage. I also tinkered with trying to get EclEmma output from DUnit ChildVMs but it seems to be run in a shutdownHook in parallel with the DUnitLauncher shutdownHook.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/bd43c341 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/bd43c341 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/bd43c341 Branch: refs/heads/feature/GEODE-291 Commit: bd43c341e8483df7fff1caabee666791e75f0dd4 Parents: 8f9b321 Author: Bruce Schuchardt <[email protected]> Authored: Mon Dec 7 13:30:00 2015 -0800 Committer: Bruce Schuchardt <[email protected]> Committed: Mon Dec 7 13:30:00 2015 -0800 ---------------------------------------------------------------------- .../internal/DistributionMessage.java | 2 +- .../internal/membership/MemberAttributes.java | 131 +---- .../membership/gms/membership/GMSJoinLeave.java | 44 +- .../gms/messages/HeartbeatMessage.java | 2 +- .../gms/messages/HeartbeatRequestMessage.java | 2 +- .../gms/messages/InstallViewMessage.java | 2 +- .../gms/messages/JoinResponseMessage.java | 10 +- .../membership/gms/messages/ViewAckMessage.java | 2 +- .../gms/messenger/AddressManager.java | 21 +- .../membership/gms/messenger/GMSPingPonger.java | 22 +- .../membership/gms/messenger/JGAddress.java | 23 +- .../gms/messenger/JGroupsMessenger.java | 385 +++++++-------- .../membership/gms/messenger/Transport.java | 2 +- .../internal/tcpserver/TcpServer.java | 2 +- .../internal/i18n/ParentLocalizedStrings.java | 4 +- .../gemfire/cache30/ReconnectDUnitTest.java | 2 +- .../internal/DistributionManagerDUnitTest.java | 1 + .../membership/MembershipJUnitTest.java | 116 ++++- .../membership/gms/MembershipManagerHelper.java | 1 + .../messenger/JGroupsMessengerJUnitTest.java | 481 ++++++++++++++++--- .../src/test/java/dunit/RemoteDUnitVMIF.java | 2 + .../src/test/java/dunit/standalone/ChildVM.java | 11 +- .../java/dunit/standalone/DUnitLauncher.java | 24 + .../java/dunit/standalone/ProcessManager.java | 14 +- .../java/dunit/standalone/RemoteDUnitVM.java | 7 +- 25 files changed, 821 insertions(+), 492 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java index 23f9dee..80ae4c0 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/DistributionMessage.java @@ -174,7 +174,7 @@ public abstract class DistributionMessage } } - public final boolean isDirectAck() { + public boolean isDirectAck() { return acker != null; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MemberAttributes.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MemberAttributes.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MemberAttributes.java index 7cd89d7..2d4d980 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MemberAttributes.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/MemberAttributes.java @@ -16,26 +16,20 @@ */ package com.gemstone.gemfire.distributed.internal.membership; -import com.gemstone.gemfire.DataSerializable; -import com.gemstone.gemfire.DataSerializer; -import com.gemstone.gemfire.InternalGemFireError; -import com.gemstone.gemfire.distributed.DurableClientAttributes; -import com.gemstone.gemfire.internal.HeapDataOutputStream; -import com.gemstone.gemfire.internal.Version; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import java.util.ArrayList; +import java.util.List; +import java.util.StringTokenizer; -import java.io.*; -import java.util.*; +import com.gemstone.gemfire.distributed.DurableClientAttributes; /** - * The attributes of a distributed member. These attributes are stored as - * the AdditionalBytes in JGroups' IpAddress. + * The attributes of a distributed member. This is largely deprecated as + * GMSMember holds all of this information. * * @author Kirk Lund * @since 5.0 */ -public class MemberAttributes implements DataSerializable { - private static final long serialVersionUID = -3257772958884802693L; +public class MemberAttributes { public static final MemberAttributes INVALID = new MemberAttributes(-1, -1, -1, -1, null, null, null); @@ -68,23 +62,6 @@ public class MemberAttributes implements DataSerializable { this.durableClientAttributes = durableClientAttributes; } - /** Constructs new MemberAttributes from DataInput. */ - public MemberAttributes(byte[] b) throws IOException, ClassNotFoundException { - this.byteInfo = b; - DataInputStream in = - new DataInputStream(new ByteArrayInputStream(b)); - fromData(in); - } - - public MemberAttributes(MemberAttributes other) { - this.dcPort = other.dcPort; - this.vmPid = other.vmPid; - this.vmKind = other.vmKind; - this.name = other.name; - this.groups = other.groups; - this.durableClientAttributes = other.durableClientAttributes; - } - /** Returns direct channel port. */ public int getPort() { return this.dcPort; @@ -115,22 +92,6 @@ public class MemberAttributes implements DataSerializable { return this.durableClientAttributes; } - /** Parses comma-separated-values into array of groups (strings). */ - public static String[] parseGroups(String csv) { - if (csv == null || csv.length() == 0) { - return new String[0]; - } - List groups = new ArrayList(); - StringTokenizer st = new StringTokenizer(csv, ","); - while (st.hasMoreTokens()) { - String groupName = st.nextToken().trim(); - // TODO make case insensitive - if (!groups.contains(groupName)) { // only add each group once - groups.add(groupName); - } - } - return (String[]) groups.toArray(new String[groups.size()]); - } /** Parses comma-separated-roles/groups into array of groups (strings). */ public static String[] parseGroups(String csvRoles, String csvGroups) { List<String> groups = new ArrayList<String>(); @@ -138,6 +99,8 @@ public class MemberAttributes implements DataSerializable { parseCsv(groups, csvGroups); return (String[]) groups.toArray(new String[groups.size()]); } + + private static void parseCsv(List<String> groups, String csv) { if (csv == null || csv.length() == 0) { return; @@ -151,82 +114,6 @@ public class MemberAttributes implements DataSerializable { } } - /** Writes the contents of this object to the given output. */ - public void toData(DataOutput out) throws IOException { - out.writeInt(this.dcPort); - out.writeInt(this.vmPid); - out.writeInt(this.vmKind); - DataSerializer.writeString(this.name, out); - DataSerializer.writeStringArray(this.groups, out); - DataSerializer.writeString(this.durableClientAttributes==null ? "" : this.durableClientAttributes.getId(), out); - DataSerializer.writeInteger(Integer.valueOf(this.durableClientAttributes==null ? 300 : this.durableClientAttributes.getTimeout()), out); - } - - /** Reads the contents of this object from the given input. */ - public void fromData(DataInput in) - throws IOException, ClassNotFoundException { - this.dcPort = in.readInt(); - this.vmPid = in.readInt(); - this.vmKind = in.readInt(); - this.name = DataSerializer.readString(in); - this.groups = DataSerializer.readStringArray(in); - String durableId = DataSerializer.readString(in); - int durableTimeout = DataSerializer.readInteger(in).intValue(); - this.durableClientAttributes = new DurableClientAttributes(durableId, durableTimeout); - } - - private byte[] byteInfo; - - /** Returns the contents of this objects serialized as a byte array. */ - public byte[] toByteArray() { - if (byteInfo != null) { - return byteInfo; - } - try { - HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); - toData(hdos); - byteInfo = hdos.toByteArray(); - return byteInfo; - } - catch (IOException e) { - throw new InternalGemFireError(LocalizedStrings.MemberAttributes_IOEXCEPTION_ON_A_BYTE_ARRAY_0.toLocalizedString(e)); - } - } - - public static MemberAttributes fromByteArray(byte[] bytes) { - try { - return new MemberAttributes(bytes); - } - catch (IOException e) { - throw new InternalGemFireError(LocalizedStrings.MemberAttributes_IOEXCEPTION_ON_A_BYTE_ARRAY_0.toLocalizedString(e)); - } - catch (ClassNotFoundException e) { - throw new InternalGemFireError(LocalizedStrings.MemberAttributes_CLASSNOTFOUNDEXCEPTION_IN_DESERIALIZATION_0.toLocalizedString(e)); - } - } - - /** - * Returns a string representation of the object. - * - * @return a string representation of the object - */ - @Override - public String toString() { - final StringBuffer sb = new StringBuffer("[MemberAttributes: "); - sb.append("dcPort=").append(this.dcPort); - sb.append(", vmPid=").append(this.vmPid); - sb.append(", vmKind=").append(this.vmKind); - sb.append(", name=").append(this.name); - sb.append(", groups=").append("("); - for (int i = 0; i < groups.length; i++) { - sb.append(groups[i]); - } - sb.append(")"); - sb.append(", durableClientAttributes=").append(this.durableClientAttributes); - sb.append("]"); - return sb.toString(); - } - /** * @return the membership view number in which this member was born */ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index 84a0bd7..3e767ae 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -230,7 +230,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { SearchState state = searchState; - long locatorWaitTime = services.getConfig().getLocatorWaitTime() * 1000; + long locatorWaitTime = ((long)services.getConfig().getLocatorWaitTime()) * 1000L; long timeout = services.getConfig().getJoinTimeout(); logger.debug("join timeout is set to {}", timeout); long retrySleep = JOIN_RETRY_SLEEP; @@ -560,7 +560,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { logger.debug("JoinLeave is recording the request to be processed in the next membership view"); synchronized (viewRequests) { viewRequests.add(request); - viewRequests.notify(); + viewRequests.notifyAll(); } } @@ -862,7 +862,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId); Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>(); - long giveUpTime = System.currentTimeMillis() + services.getConfig().getLocatorWaitTime() * 1000; + long giveUpTime = System.currentTimeMillis() + ((long)services.getConfig().getLocatorWaitTime() * 1000L); int connectTimeout = (int)services.getConfig().getMemberTimeout() * 2; boolean anyResponses = false; @@ -1055,7 +1055,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { private void processJoinResponse(JoinResponseMessage rsp) { synchronized (joinResponse) { joinResponse[0] = rsp; - joinResponse.notify(); + joinResponse.notifyAll(); } } @@ -1149,7 +1149,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { isJoined = true; synchronized(joinResponse) { - joinResponse.notify(); + joinResponse.notifyAll(); } if (!newView.getCreator().equals(this.localAddress)) { @@ -1253,7 +1253,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { && newView.getCreator().equals(localAddress)) { // view-creator logs this newView.logCrashedMemberWeights(currentView, logger); } - int failurePoint = (int) (Math.round(51 * oldWeight) / 100.0); + int failurePoint = (int) (Math.round(51.0 * oldWeight) / 100.0); if (failedWeight > failurePoint && quorumLostView != newView) { quorumLostView = newView; logger.warn("total weight lost in this view change is {} of {}. Quorum has been lost!", failedWeight, oldWeight); @@ -1437,30 +1437,39 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { logger.debug("processing {}", m); switch (m.getDSFID()) { case JOIN_REQUEST: + assert m instanceof JoinRequestMessage; processJoinRequest((JoinRequestMessage) m); break; case JOIN_RESPONSE: + assert m instanceof JoinResponseMessage; processJoinResponse((JoinResponseMessage) m); break; case INSTALL_VIEW_MESSAGE: + assert m instanceof InstallViewMessage; processViewMessage((InstallViewMessage) m); break; case VIEW_ACK_MESSAGE: + assert m instanceof ViewAckMessage; processViewAckMessage((ViewAckMessage) m); break; case LEAVE_REQUEST_MESSAGE: + assert m instanceof LeaveRequestMessage; processLeaveRequest((LeaveRequestMessage) m); break; case REMOVE_MEMBER_REQUEST: + assert m instanceof RemoveMemberMessage; processRemoveRequest((RemoveMemberMessage) m); break; case FIND_COORDINATOR_REQ: + assert m instanceof FindCoordinatorRequest; processFindCoordinatorRequest((FindCoordinatorRequest) m); break; case FIND_COORDINATOR_RESP: + assert m instanceof FindCoordinatorResponse; processFindCoordinatorResponse((FindCoordinatorResponse) m); break; case NETWORK_PARTITION_MESSAGE: + assert m instanceof NetworkPartitionMessage; processNetworkPartitionMessage((NetworkPartitionMessage) m); break; default: @@ -1591,7 +1600,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (notRepliedYet.isEmpty() || (pendingRemovals != null && pendingRemovals.containsAll(notRepliedYet))) { logger.debug("All anticipated view responses received - notifying waiting thread"); waiting = false; - notify(); + notifyAll(); } else { logger.debug("Still waiting for these view replies: {}", notRepliedYet); } @@ -1616,14 +1625,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } } finally { - if (!this.waiting) { - // if we've set waiting to false due to incoming messages then - // we've discounted receiving any other responses from the - // remaining members due to leave/crash notification - result = pendingRemovals; - } else { - result.addAll(pendingRemovals); - this.waiting = false; + synchronized(this) { + if (!this.waiting) { + // if we've set waiting to false due to incoming messages then + // we've discounted receiving any other responses from the + // remaining members due to leave/crash notification + result = pendingRemovals; + } else { + result.addAll(pendingRemovals); + this.waiting = false; + } } } return result; @@ -1690,7 +1701,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { void shutdown() { shutdown = true; synchronized (viewRequests) { - viewRequests.notify(); + viewRequests.notifyAll(); interrupt(); } } @@ -1906,7 +1917,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { // be reused in an auto-reconnect and get a new vmViewID mbrs.addAll(joinReqs); newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs, new HashSet<InternalDistributedMember>(removalReqs)); - int size = joinReqs.size(); for (InternalDistributedMember mbr: joinReqs) { if (mbrs.contains(mbr)) { newView.setFailureDetectionPort(mbr, joinPorts.get(mbr)); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatMessage.java index a116913..6662d2c 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatMessage.java @@ -48,7 +48,7 @@ public class HeartbeatMessage extends HighPriorityDistributionMessage { } @Override - protected void process(DistributionManager dm) { + public void process(DistributionManager dm) { throw new IllegalStateException("this message is not intended to execute in a thread pool"); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java index f7e1009..3c08e33 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/HeartbeatRequestMessage.java @@ -48,7 +48,7 @@ public class HeartbeatRequestMessage extends HighPriorityDistributionMessage{ } @Override - protected void process(DistributionManager dm) { + public void process(DistributionManager dm) { throw new IllegalStateException("this message is not intended to execute in a thread pool"); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java index 8d4cb4e..91f6918 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java @@ -76,7 +76,7 @@ public class InstallViewMessage extends HighPriorityDistributionMessage { } @Override - protected void process(DistributionManager dm) { + public void process(DistributionManager dm) { throw new IllegalStateException("this message is not intended to execute in a thread pool"); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java index df1b3f6..c01353a 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java @@ -36,7 +36,7 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage { private NetView currentView; private String rejectionMessage; private InternalDistributedMember memberID; - private Object messengerData; + private byte[] messengerData; private boolean becomeCoordinator; public JoinResponseMessage(InternalDistributedMember memberID, NetView view) { @@ -76,11 +76,11 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage { return rejectionMessage; } - public Object getMessengerData() { + public byte[] getMessengerData() { return this.messengerData; } - public void setMessengerData(Object data) { + public void setMessengerData(byte[] data) { this.messengerData = data; } @@ -114,7 +114,7 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage { DataSerializer.writeObject(memberID, out); out.writeBoolean(becomeCoordinator); DataSerializer.writeString(rejectionMessage, out); - DataSerializer.writeObject(messengerData, out); + DataSerializer.writeByteArray(messengerData, out); } @Override @@ -123,7 +123,7 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage { memberID = DataSerializer.readObject(in); becomeCoordinator = in.readBoolean(); rejectionMessage = DataSerializer.readString(in); - messengerData = DataSerializer.readObject(in); + messengerData = DataSerializer.readByteArray(in); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewAckMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewAckMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewAckMessage.java index 00f31d6..39ade6e 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewAckMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/ViewAckMessage.java @@ -74,7 +74,7 @@ public class ViewAckMessage extends HighPriorityDistributionMessage { } @Override - protected void process(DistributionManager dm) { + public void process(DistributionManager dm) { throw new IllegalStateException("this message is not intended to execute in a thread pool"); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/AddressManager.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/AddressManager.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/AddressManager.java index 0fd1c6e..1169044 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/AddressManager.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/AddressManager.java @@ -23,10 +23,8 @@ import java.util.List; import org.apache.logging.log4j.Logger; import org.jgroups.Address; import org.jgroups.Event; -import org.jgroups.Message; import org.jgroups.protocols.PingData; import org.jgroups.protocols.TP; -import org.jgroups.protocols.UDP; import org.jgroups.stack.IpAddress; import org.jgroups.stack.Protocol; import org.jgroups.util.Responses; @@ -55,19 +53,16 @@ public class AddressManager extends Protocol { @Override public Object up(Event evt) { -// logger.info("AddressManager.up: " + evt); - switch (evt.getType()) { case Event.FIND_MBRS: List<Address> missing = (List<Address>)evt.getArg(); -// logger.debug("AddressManager.FIND_MBRS processing {}", missing); + Responses responses = new Responses(false); for (Address laddr: missing) { try { if (laddr instanceof JGAddress) { PingData pd = new PingData(laddr, true, laddr.toString(), newIpAddress(laddr)); -// logger.debug("AddressManager.FIND_MBRS adding response {}", pd); responses.addResponse(pd, false); updateUDPCache(pd); } @@ -96,17 +91,13 @@ public class AddressManager extends Protocol { findPingDataMethod(); } if (setPingData != null) { - Exception problem = null; try { setPingData.invoke(transport, new Object[]{pd}); - } catch (InvocationTargetException e) { - problem = e; - } catch (IllegalAccessException e) { - problem = e; - } - if (problem != null && !warningLogged) { - log.warn("Unable to update JGroups address cache - this may affect performance", problem); - warningLogged = true; + } catch (InvocationTargetException | IllegalAccessException e) { + if (!warningLogged) { + log.warn("Unable to update JGroups address cache - this may affect performance", e); + warningLogged = true; + } } } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSPingPonger.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSPingPonger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSPingPonger.java index fb32254..e2951ee 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSPingPonger.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSPingPonger.java @@ -35,23 +35,31 @@ public class GMSPingPonger { } public void sendPongMessage(JChannel channel, Address src, Address dest) throws Exception { - channel.send(createJGMessage(pongInBytes, src, dest, Version.CURRENT_ORDINAL)); + channel.send(createPongMessage(src, dest)); } public Message createPongMessage(Address src, Address dest) { return createJGMessage(pongInBytes, src, dest, Version.CURRENT_ORDINAL); } + public Message createPingMessage(Address src, Address dest) { + return createJGMessage(pingInBytes, src, dest, Version.CURRENT_ORDINAL); + } + public void sendPingMessage(JChannel channel, Address src, JGAddress dest) throws Exception { - channel.send(createJGMessage(pingInBytes, src, dest, Version.CURRENT_ORDINAL)); + channel.send(createPingMessage(src, dest)); } private Message createJGMessage(byte[] msgBytes, Address src, Address dest, short version) { - Message msg = new Message(); - msg.setDest(dest); - msg.setSrc(src); - msg.setObject(msgBytes); - return msg; + Message msg = new Message(); + msg.setDest(dest); + msg.setSrc(src); + msg.setObject(msgBytes); + msg.setFlag(Message.Flag.NO_RELIABILITY); + msg.setFlag(Message.Flag.NO_FC); + msg.setFlag(Message.Flag.DONT_BUNDLE); + msg.setFlag(Message.Flag.OOB); + return msg; } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGAddress.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGAddress.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGAddress.java index 1380eb2..6ddafa0 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGAddress.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGAddress.java @@ -84,22 +84,6 @@ public class JGAddress extends UUID { } - private void setAddressToLocalHost() { - try { - ip_addr=InetAddress.getLocalHost(); // get first NIC found (on multi-homed systems) - } - catch(Exception e) { - ip_addr=null; - } - if(ip_addr == null) { - try { - ip_addr=InetAddress.getByName(null); - } - catch(UnknownHostException e) { - } - } - } - public final InetAddress getInetAddress() {return ip_addr;} public final int getPort() {return port;} @@ -112,6 +96,7 @@ public class JGAddress extends UUID { } + @Override public String toString() { StringBuilder sb=new StringBuilder(); @@ -137,6 +122,7 @@ public class JGAddress extends UUID { } + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { try { readFrom(in); @@ -146,6 +132,7 @@ public class JGAddress extends UUID { } } + @Override public void writeExternal(ObjectOutput out) throws IOException { try { writeTo(out); @@ -155,6 +142,7 @@ public class JGAddress extends UUID { } } + @Override public void writeTo(DataOutput out) throws Exception { if(ip_addr != null) { byte[] address=ip_addr.getAddress(); // 4 bytes (IPv4) or 16 bytes (IPv6) @@ -180,6 +168,7 @@ public class JGAddress extends UUID { return leastSigBits; } + @Override public void readFrom(DataInput in) throws Exception { int len=in.readByte(); if(len > 0 && (len != Global.IPV4_SIZE && len != Global.IPV6_SIZE)) @@ -202,6 +191,7 @@ public class JGAddress extends UUID { leastSigBits = in.readLong(); } + @Override public int size() { // length (1 bytes) + 4 bytes for port int tmp_size=Global.BYTE_SIZE+ Global.SHORT_SIZE +Global.SHORT_SIZE @@ -213,6 +203,7 @@ public class JGAddress extends UUID { return tmp_size; } + @Override public JGAddress copy() { JGAddress result = new JGAddress(); result.mostSigBits = mostSigBits; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java index 4e68b63..326491a 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java @@ -53,6 +53,7 @@ import org.jgroups.Message; import org.jgroups.Message.Flag; import org.jgroups.Message.TransientFlag; import org.jgroups.Receiver; +import org.jgroups.ReceiverAdapter; import org.jgroups.View; import org.jgroups.ViewId; import org.jgroups.conf.ClassConfigurator; @@ -64,6 +65,7 @@ import org.jgroups.util.UUID; import com.gemstone.gemfire.DataSerializer; import com.gemstone.gemfire.ForcedDisconnectException; import com.gemstone.gemfire.GemFireConfigException; +import com.gemstone.gemfire.GemFireIOException; import com.gemstone.gemfire.SystemConnectException; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.DurableClientAttributes; @@ -100,13 +102,6 @@ public class JGroupsMessenger implements Messenger { private static final Logger logger = Services.getLogger(); /** - * The system property that specifies the name of a file from which to read - * Jgroups configuration information - */ - public static final String JGROUPS_CONFIG = System - .getProperty("geode.jgroups_config_file"); - - /** * The location (in the product) of the locator Jgroups config file. */ private static final String DEFAULT_JGROUPS_TCP_CONFIG = "com/gemstone/gemfire/distributed/internal/membership/gms/messenger/jgroups-config.xml"; @@ -138,7 +133,7 @@ public class JGroupsMessenger implements Messenger { private GMSPingPonger pingPonger = new GMSPingPonger(); - private volatile long pongsReceived; + protected volatile long pongsReceived; /** * A set that contains addresses that we have logged JGroups IOExceptions for in the @@ -173,23 +168,15 @@ public class JGroupsMessenger implements Messenger { InputStream is= null; - if (JGROUPS_CONFIG != null) { - File file = new File(JGROUPS_CONFIG); - if (!file.exists()) { - throw new GemFireConfigException(LocalizedStrings.GroupMembershipService_JGROUPS_CONFIGURATION_FILE_0_DOES_NOT_EXIST.toLocalizedString(JGROUPS_CONFIG)); - } + String r = null; + if (transport.isMcastEnabled()) { + r = DEFAULT_JGROUPS_MCAST_CONFIG; + } else { + r = DEFAULT_JGROUPS_TCP_CONFIG; } - else { - String r = null; - if (transport.isMcastEnabled()) { - r = DEFAULT_JGROUPS_MCAST_CONFIG; - } else { - r = DEFAULT_JGROUPS_TCP_CONFIG; - } - is = ClassPathLoader.getLatest().getResourceAsStream(getClass(), r); - if (is == null) { - throw new GemFireConfigException(LocalizedStrings.GroupMembershipService_CANNOT_FIND_0.toLocalizedString(r)); - } + is = ClassPathLoader.getLatest().getResourceAsStream(getClass(), r); + if (is == null) { + throw new GemFireConfigException(LocalizedStrings.GroupMembershipService_CANNOT_FIND_0.toLocalizedString(r)); } String properties; @@ -198,11 +185,7 @@ public class JGroupsMessenger implements Messenger { //properties = config.getProtocolStackString(); StringBuffer sb = new StringBuffer(3000); BufferedReader br; - if (JGROUPS_CONFIG != null) { - br = new BufferedReader(new InputStreamReader(is)); - } else { - br = new BufferedReader(new InputStreamReader(is, "US-ASCII")); - } + br = new BufferedReader(new InputStreamReader(is, "US-ASCII")); String input; while ((input=br.readLine()) != null) { sb.append(input); @@ -354,7 +337,7 @@ public class JGroupsMessenger implements Messenger { public void stop() { if (this.myChannel != null) { if ((services.isShutdownDueToForcedDisconnect() && services.isAutoReconnectEnabled()) || services.getManager().isReconnectingDS()) { - + // leave the channel open for reconnect attempts } else { this.myChannel.close(); @@ -396,12 +379,11 @@ public class JGroupsMessenger implements Messenger { * recipient.<p> * see Transport._send() */ - public void handleJGroupsIOException(IOException e, Message msg, Address dest) { + public void handleJGroupsIOException(IOException e, Address dest) { if (addressesWithioExceptionsProcessed.contains(dest)) { return; } addressesWithioExceptionsProcessed.add(dest); - logger.info("processing JGroups IOException: " + e.getMessage()); NetView v = this.view; JGAddress jgMbr = (JGAddress)dest; if (jgMbr != null && v != null) { @@ -444,18 +426,18 @@ public class JGroupsMessenger implements Messenger { logger.info("Unable to find getPhysicallAddress method in UDP - parsing its address instead"); } - if (this.jgAddress == null) { - String addr = udp.getLocalPhysicalAddress(); - int cidx = addr.lastIndexOf(':'); // IPv6 literals might have colons - String host = addr.substring(0, cidx); - int jgport = Integer.parseInt(addr.substring(cidx+1, addr.length())); - try { - this.jgAddress = new JGAddress(logicalAddress, new IpAddress(InetAddress.getByName(host), jgport)); - } catch (UnknownHostException e) { - myChannel.disconnect(); - throw new SystemConnectException("unable to initialize jgroups address", e); - } - } +// if (this.jgAddress == null) { +// String addr = udp.getLocalPhysicalAddress(); +// int cidx = addr.lastIndexOf(':'); // IPv6 literals might have colons +// String host = addr.substring(0, cidx); +// int jgport = Integer.parseInt(addr.substring(cidx+1, addr.length())); +// try { +// this.jgAddress = new JGAddress(logicalAddress, new IpAddress(InetAddress.getByName(host), jgport)); +// } catch (UnknownHostException e) { +// myChannel.disconnect(); +// throw new SystemConnectException("unable to initialize jgroups address", e); +// } +// } } // install the address in the JGroups channel protocols @@ -563,15 +545,13 @@ public class JGroupsMessenger implements Messenger { boolean useMcast = false; if (services.getConfig().getTransport().isMcastEnabled()) { - useMcast = services.getManager().isMulticastAllowed() - && (msg.getMulticast() || allDestinations); + if (msg.getMulticast() || allDestinations) { + useMcast = services.getManager().isMulticastAllowed(); + } } if (logger.isDebugEnabled() && reliably) { - String recips = "multicast"; - if (!useMcast) { - recips = Arrays.toString(msg.getRecipients()); - } + String recips = useMcast? "multicast" : Arrays.toString(msg.getRecipients()); logger.debug("sending via JGroups: [{}] recipients: {}", msg, recips); } @@ -579,22 +559,20 @@ public class JGroupsMessenger implements Messenger { if (useMcast) { + long startSer = theStats.startMsgSerialization(); + Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL); + theStats.endMsgSerialization(startSer); + Exception problem = null; try { - long startSer = theStats.startMsgSerialization(); - Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL); jmsg.setTransientFlag(TransientFlag.DONT_LOOPBACK); if (!reliably) { jmsg.setFlag(Message.Flag.NO_RELIABILITY); } - theStats.endMsgSerialization(startSer); theStats.incSentBytes(jmsg.getLength()); logger.trace("Sending JGroups message: {}", jmsg); myChannel.send(jmsg); } - catch (IllegalArgumentException e) { - problem = e; - } catch (Exception e) { logger.debug("caught unexpected exception", e); Throwable cause = e.getCause(); @@ -603,14 +581,12 @@ public class JGroupsMessenger implements Messenger { } else { problem = e; } - } - if (problem != null) { if (services.getShutdownCause() != null) { - Throwable cause = services.getShutdownCause(); + Throwable shutdownCause = services.getShutdownCause(); // If ForcedDisconnectException occurred then report it as actual // problem. - if (cause instanceof ForcedDisconnectException) { - problem = (Exception) cause; + if (shutdownCause instanceof ForcedDisconnectException) { + problem = (Exception) shutdownCause; } else { Throwable ne = problem; while (ne.getCause() != null) { @@ -626,83 +602,83 @@ public class JGroupsMessenger implements Messenger { } // useMcast else { // ! useMcast int len = destinations.length; - List<GMSMember> calculatedMembers; // explicit list of members - int calculatedLen; // == calculatedMembers.len - if (len == 1 && destinations[0] == DistributionMessage.ALL_RECIPIENTS) { // send to all - // Grab a copy of the current membership - NetView v = services.getJoinLeave().getView(); - - // Construct the list - calculatedLen = v.size(); - calculatedMembers = new LinkedList<GMSMember>(); - for (int i = 0; i < calculatedLen; i ++) { - InternalDistributedMember m = (InternalDistributedMember)v.get(i); - calculatedMembers.add((GMSMember)m.getNetMember()); - } - } // send to all - else { // send to explicit list - calculatedLen = len; - calculatedMembers = new LinkedList<GMSMember>(); - for (int i = 0; i < calculatedLen; i ++) { - calculatedMembers.add((GMSMember)destinations[i].getNetMember()); - } - } // send to explicit list - Int2ObjectOpenHashMap<Message> messages = new Int2ObjectOpenHashMap<>(); - long startSer = theStats.startMsgSerialization(); - boolean firstMessage = true; - for (Iterator<GMSMember> it=calculatedMembers.iterator(); it.hasNext(); ) { - GMSMember mbr = it.next(); - short version = mbr.getVersionOrdinal(); - if ( !messages.containsKey(version) ) { - Message jmsg = createJGMessage(msg, local, version); - messages.put(version, jmsg); - if (firstMessage) { - theStats.incSentBytes(jmsg.getLength()); - firstMessage = false; - } - } + List<GMSMember> calculatedMembers; // explicit list of members + int calculatedLen; // == calculatedMembers.len + if (len == 1 && destinations[0] == DistributionMessage.ALL_RECIPIENTS) { // send to all + // Grab a copy of the current membership + NetView v = services.getJoinLeave().getView(); + + // Construct the list + calculatedLen = v.size(); + calculatedMembers = new LinkedList<GMSMember>(); + for (int i = 0; i < calculatedLen; i ++) { + InternalDistributedMember m = (InternalDistributedMember)v.get(i); + calculatedMembers.add((GMSMember)m.getNetMember()); } - theStats.endMsgSerialization(startSer); - Collections.shuffle(calculatedMembers); - int i=0; - for (GMSMember mbr: calculatedMembers) { - JGAddress to = new JGAddress(mbr); - short version = mbr.getVersionOrdinal(); - Message jmsg = (Message)messages.get(version); - Exception problem = null; - try { - Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg; - if (!reliably) { - jmsg.setFlag(Message.Flag.NO_RELIABILITY); - } - tmp.setDest(to); - tmp.setSrc(this.jgAddress); - logger.trace("Unicasting to {}", to); - myChannel.send(tmp); + } // send to all + else { // send to explicit list + calculatedLen = len; + calculatedMembers = new LinkedList<GMSMember>(); + for (int i = 0; i < calculatedLen; i ++) { + calculatedMembers.add((GMSMember)destinations[i].getNetMember()); + } + } // send to explicit list + Int2ObjectOpenHashMap<Message> messages = new Int2ObjectOpenHashMap<>(); + long startSer = theStats.startMsgSerialization(); + boolean firstMessage = true; + for (Iterator<GMSMember> it=calculatedMembers.iterator(); it.hasNext(); ) { + GMSMember mbr = it.next(); + short version = mbr.getVersionOrdinal(); + if ( !messages.containsKey(version) ) { + Message jmsg = createJGMessage(msg, local, version); + messages.put(version, jmsg); + if (firstMessage) { + theStats.incSentBytes(jmsg.getLength()); + firstMessage = false; } - catch (Exception e) { - problem = e; + } + } + theStats.endMsgSerialization(startSer); + Collections.shuffle(calculatedMembers); + int i=0; + for (GMSMember mbr: calculatedMembers) { + JGAddress to = new JGAddress(mbr); + short version = mbr.getVersionOrdinal(); + Message jmsg = (Message)messages.get(version); + Exception problem = null; + try { + Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg; + if (!reliably) { + jmsg.setFlag(Message.Flag.NO_RELIABILITY); } - if (problem != null) { - if (services.getManager().getShutdownCause() != null) { - Throwable cause = services.getManager().getShutdownCause(); - // If ForcedDisconnectException occurred then report it as actual - // problem. - if (cause instanceof ForcedDisconnectException) { - problem = (Exception) cause; - } else { - Throwable ne = problem; - while (ne.getCause() != null) { - ne = ne.getCause(); - } - ne.initCause(services.getManager().getShutdownCause()); + tmp.setDest(to); + tmp.setSrc(this.jgAddress); + logger.trace("Unicasting to {}", to); + myChannel.send(tmp); + } + catch (Exception e) { + problem = e; + } + if (problem != null) { + Throwable cause = services.getShutdownCause(); + if (cause != null) { + // If ForcedDisconnectException occurred then report it as actual + // problem. + if (cause instanceof ForcedDisconnectException) { + problem = (Exception) cause; + } else { + Throwable ne = problem; + while (ne.getCause() != null) { + ne = ne.getCause(); } + ne.initCause(cause); } + } final String channelClosed = LocalizedStrings.GroupMembershipService_CHANNEL_CLOSED.toLocalizedString(); -// services.getManager().membershipFailure(channelClosed, problem); + // services.getManager().membershipFailure(channelClosed, problem); throw new DistributedSystemDisconnectedException(channelClosed, problem); - } - } // send individually + } + } // send individually } // !useMcast // The contract is that every destination enumerated in the @@ -769,12 +745,16 @@ public class JGroupsMessenger implements Messenger { msg.setBuffer(out_stream.toByteArray()); services.getStatistics().endMsgSerialization(start); } - catch(IOException ex) { - IllegalArgumentException ia = new - IllegalArgumentException("Error serializing message"); - ia.initCause(ex); - throw ia; - //throw new IllegalArgumentException(ex.toString()); + catch(IOException | GemFireIOException ex) { + logger.warn("Error serializing message", ex); + if (ex instanceof GemFireIOException) { + throw (GemFireIOException)ex; + } else { + GemFireIOException ioe = new + GemFireIOException("Error serializing message"); + ioe.initCause(ex); + throw ioe; + } } return msg; } @@ -820,19 +800,19 @@ public class JGroupsMessenger implements Messenger { GMSMember m = DataSerializer.readObject(dis); result = DataSerializer.readObject(dis); - if (result instanceof DistributionMessage) { - DistributionMessage dm = (DistributionMessage)result; - // JoinRequestMessages are sent with an ID that may have been - // reused from a previous life by way of auto-reconnect, - // so we don't want to find a canonical reference for the - // request's sender ID - if (dm.getDSFID() == JOIN_REQUEST) { - sender = ((JoinRequestMessage)dm).getMemberID(); - } else { - sender = getMemberFromView(m, ordinal); - } - ((DistributionMessage)result).setSender(sender); + + DistributionMessage dm = (DistributionMessage)result; + + // JoinRequestMessages are sent with an ID that may have been + // reused from a previous life by way of auto-reconnect, + // so we don't want to find a canonical reference for the + // request's sender ID + if (dm.getDSFID() == JOIN_REQUEST) { + sender = ((JoinRequestMessage)dm).getMemberID(); + } else { + sender = getMemberFromView(m, ordinal); } + ((DistributionMessage)result).setSender(sender); services.getStatistics().endMsgDeserialization(start); } @@ -850,17 +830,23 @@ public class JGroupsMessenger implements Messenger { /** look for certain messages that may need to be altered before being sent */ - private void filterOutgoingMessage(DistributionMessage m) { + void filterOutgoingMessage(DistributionMessage m) { switch (m.getDSFID()) { case JOIN_RESPONSE: JoinResponseMessage jrsp = (JoinResponseMessage)m; - if (jrsp.getRejectionMessage() != null + if (jrsp.getRejectionMessage() == null && services.getConfig().getTransport().isMcastEnabled()) { // get the multicast message digest and pass it with the join response Digest digest = (Digest)this.myChannel.getProtocolStack() .getTopProtocol().down(Event.GET_DIGEST_EVT); - jrsp.setMessengerData(digest); + HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT); + try { + digest.writeTo(hdos); + } catch (Exception e) { + logger.fatal("Unable to serialize JGroups messaging digest", e); + } + jrsp.setMessengerData(hdos.toByteArray()); } break; default: @@ -868,18 +854,27 @@ public class JGroupsMessenger implements Messenger { } } - private void filterIncomingMessage(DistributionMessage m) { + void filterIncomingMessage(DistributionMessage m) { switch (m.getDSFID()) { case JOIN_RESPONSE: JoinResponseMessage jrsp = (JoinResponseMessage)m; - if (jrsp.getRejectionMessage() != null + if (jrsp.getRejectionMessage() == null && services.getConfig().getTransport().isMcastEnabled()) { - Digest digest = (Digest)jrsp.getMessengerData(); - if (digest != null) { - logger.trace("installing JGroups message digest {}", digest); - this.myChannel.getProtocolStack() - .getTopProtocol().down(new Event(Event.SET_DIGEST, digest)); + byte[] serializedDigest = jrsp.getMessengerData(); + ByteArrayInputStream bis = new ByteArrayInputStream(serializedDigest); + DataInputStream dis = new DataInputStream(bis); + try { + Digest digest = new Digest(); + digest.readFrom(dis); + if (digest != null) { + logger.trace("installing JGroups message digest {}", digest); + this.myChannel.getProtocolStack() + .getTopProtocol().down(new Event(Event.SET_DIGEST, digest)); + jrsp.setMessengerData(null); + } + } catch (Exception e) { + logger.fatal("Unable to read JGroups messaging digest", e); } } break; @@ -894,13 +889,20 @@ public class JGroupsMessenger implements Messenger { } /** - * returns the JGroups configuration string + * returns the JGroups configuration string, for testing */ public String getJGroupsStackConfig() { return this.jgStackConfig; } /** + * returns the pinger, for testing + */ + public GMSPingPonger getPingPonger() { + return this.pingPonger; + } + + /** * for unit testing we need to replace UDP with a fake UDP protocol */ public void setJGroupsStackConfigForTesting(String config) { @@ -954,9 +956,10 @@ public class JGroupsMessenger implements Messenger { return qc; } /** - * Puller receives incoming JGroups messages and passes them to a handler + * JGroupsReceiver receives incoming JGroups messages and passes them to a handler. + * It may be accessed through JChannel.getReceiver(). */ - class JGroupsReceiver implements Receiver { + class JGroupsReceiver extends ReceiverAdapter { @Override public void receive(Message jgmsg) { @@ -970,6 +973,9 @@ public class JGroupsMessenger implements Messenger { //Respond to ping messages sent from other systems that are in a auto reconnect state byte[] contents = jgmsg.getBuffer(); + if (contents == null) { + return; + } if (pingPonger.isPingMessage(contents)) { try { pingPonger.sendPongMessage(myChannel, jgAddress, jgmsg.getSrc()); @@ -985,45 +991,27 @@ public class JGroupsMessenger implements Messenger { Object o = readJGMessage(jgmsg); if (o == null) { - logger.warn(LocalizedMessage.create( - LocalizedStrings.GroupMembershipService_MEMBERSHIP_GEMFIRE_RECEIVED_NULL_MESSAGE_FROM__0, String.valueOf(jgmsg))); - logger.warn(LocalizedMessage.create( - LocalizedStrings.GroupMembershipService_MEMBERSHIP_MESSAGE_HEADERS__0, jgmsg.printObjectHeaders())); - return; - } else if ( !(o instanceof DistributionMessage) ) { - logger.warn("Received something other than a message from " + jgmsg.getSrc() + ": " + o); return; } DistributionMessage msg = (DistributionMessage)o; + assert msg.getSender() != null; // admin-only VMs don't have caches, so we ignore cache operations // multicast to them, avoiding deserialization cost and classpath // problems if ( (services.getConfig().getTransport().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE) && (msg instanceof DistributedCacheOperation.CacheOperationMessage)) { - if (logger.isTraceEnabled()) - logger.trace("Membership: admin VM discarding cache operation message {}", jgmsg.getObject()); return; } msg.resetTimestamp(); msg.setBytesRead(jgmsg.getLength()); - if (msg.getSender() == null) { - Exception e = new Exception(LocalizedStrings.GroupMembershipService_NULL_SENDER.toLocalizedString()); - logger.warn(LocalizedMessage.create( - LocalizedStrings.GroupMembershipService_MEMBERSHIP_GEMFIRE_RECEIVED_A_MESSAGE_WITH_NO_SENDER_ADDRESS), e); - } - try { - if (logger.isTraceEnabled()) { - logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender()); - } + logger.trace("JGroupsMessenger dispatching {} from {}", msg, msg.getSender()); filterIncomingMessage(msg); - MessageHandler h = getMessageHandler(msg); - logger.trace("Handler for this message is {}", h); - h.processMessage(msg); + getMessageHandler(msg).processMessage(msg); } catch (MemberShunnedException e) { // message from non-member - ignore @@ -1053,36 +1041,7 @@ public class JGroupsMessenger implements Messenger { } return h; } - - - @Override - public void block() { - } - - - @Override - public void viewAccepted(View new_view) { - } - - - @Override - public void getState(OutputStream output) throws Exception { - } - - @Override - public void setState(InputStream input) throws Exception { - } - - @Override - public void suspect(Address suspected_mbr) { - } - - @Override - public void unblock() { - } - - - } // Puller class + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java index 8ba59b6..1687261 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/Transport.java @@ -97,7 +97,7 @@ public class Transport extends UDP { catch (IOException e) { if (messenger != null /*&& e.getMessage().contains("Operation not permitted")*/) { // this is the english Oracle JDK exception condition we really want to catch - messenger.handleJGroupsIOException(e, msg, dest); + messenger.handleJGroupsIOException(e, dest); } } catch(Throwable e) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java index 516fe8d..92793ae 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/tcpserver/TcpServer.java @@ -362,7 +362,7 @@ public class TcpServer { versionOrdinal = input.readShort(); } - if (log.isDebugEnabled()) { + if (log.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) { log.debug("Locator reading request from " + sock.getInetAddress() + " with version " + Version.fromOrdinal(versionOrdinal, false)); } input = new VersionedDataInputStream(input, Version.fromOrdinal( http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java index 150b408..7bb97b9 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/i18n/ParentLocalizedStrings.java @@ -730,9 +730,9 @@ class ParentLocalizedStrings { public static final StringId GroupMembershipService_MEMBERSHIP_FAULT_WHILE_PROCESSING_VIEW_REMOVAL_OF__0 = new StringId(1722, "Membership: Fault while processing view removal of {0}"); public static final StringId GroupMembershipService_MEMBERSHIP_FINISHED_VIEW_PROCESSING_VIEWID___0 = new StringId(1723, "Membership: Finished view processing viewID = {0}"); public static final StringId GroupMembershipService_MEMBERSHIP_GEMFIRE_RECEIVED_A_MESSAGE_WITH_NO_SENDER_ADDRESS = new StringId(1724, "Membership: GemFire received a message with no sender address"); - public static final StringId GroupMembershipService_MEMBERSHIP_GEMFIRE_RECEIVED_NULL_MESSAGE_FROM__0 = new StringId(1725, "Membership: GemFire received null message from {0}"); + // ok to reuse 1725 public static final StringId GroupMembershipService_MEMBERSHIP_IGNORING_SURPRISE_CONNECT_FROM_SHUNNED_MEMBER_0 = new StringId(1726, "Membership: Ignoring surprise connect from shunned member <{0}>"); - public static final StringId GroupMembershipService_MEMBERSHIP_MESSAGE_HEADERS__0 = new StringId(1727, "Membership: message headers: {0}"); + // ok to reuse 1727 // ok to reuse 1728 public static final StringId GroupMembershipService_MEMBERSHIP_PAUSING_TO_ALLOW_OTHER_CONCURRENT_PROCESSES_TO_JOIN_THE_DISTRIBUTED_SYSTEM = new StringId(1729, "Membership: Pausing to allow other concurrent processes to join the distributed system"); public static final StringId GroupMembershipService_MEMBERSHIP_PROCESSING_ADDITION__0_ = new StringId(1730, "Membership: Processing addition < {0} >"); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java index dd6f1fa..9d0f69f 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/ReconnectDUnitTest.java @@ -581,7 +581,7 @@ public class ReconnectDUnitTest extends CacheTestCase Properties config = getDistributedSystemProperties(); config.put(DistributionConfig.ROLES_NAME, ""); config.put(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel()); - config.put("log-file", "roleLossController.log"); +// config.put("log-file", "roleLossController.log"); //creating the DS getSystem(config); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java index 82dfdb7..51771cb 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/DistributionManagerDUnitTest.java @@ -347,6 +347,7 @@ public class DistributionManagerDUnitTest extends DistributedTestCase { public void afterCreate(EntryEvent event) { try { if (playDead) { + MembershipManagerHelper.beSickMember(system); MembershipManagerHelper.playDead(system); } Thread.sleep(15000); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java index 2ce1ca7..bee2367 100755 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/MembershipJUnitTest.java @@ -16,16 +16,21 @@ */ package com.gemstone.gemfire.distributed.internal.membership; +import static org.mockito.Mockito.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import java.io.File; import java.net.InetAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -42,16 +47,28 @@ import org.junit.experimental.categories.Category; import com.gemstone.gemfire.GemFireConfigException; import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.distributed.internal.DMStats; import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl; import com.gemstone.gemfire.distributed.internal.DistributionManager; +import com.gemstone.gemfire.distributed.internal.DistributionMessage; import com.gemstone.gemfire.distributed.internal.InternalLocator; +import com.gemstone.gemfire.distributed.internal.SerialAckedMessage; import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil; import com.gemstone.gemfire.distributed.internal.membership.gms.ServiceConfig; import com.gemstone.gemfire.distributed.internal.membership.gms.Services; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave; import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.HeartbeatRequestMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.SuspectMembersMessage; +import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage; import com.gemstone.gemfire.distributed.internal.membership.gms.mgr.GMSMembershipManager; import com.gemstone.gemfire.internal.AvailablePortHelper; import com.gemstone.gemfire.internal.SocketCreator; @@ -159,6 +176,7 @@ public class MembershipJUnitTest { MembershipManager m1=null, m2=null; Locator l = null; + int mcastPort = AvailablePortHelper.getRandomAvailableUDPPort(); try { @@ -175,9 +193,11 @@ public class MembershipJUnitTest { // create configuration objects Properties nonDefault = new Properties(); nonDefault.put(DistributionConfig.DISABLE_TCP_NAME, "true"); - nonDefault.put(DistributionConfig.MCAST_PORT_NAME, "0"); + nonDefault.put(DistributionConfig.MCAST_PORT_NAME, String.valueOf(mcastPort)); nonDefault.put(DistributionConfig.LOG_FILE_NAME, ""); -// nonDefault.put(DistributionConfig.LOG_LEVEL_NAME, "finest"); + nonDefault.put(DistributionConfig.LOG_LEVEL_NAME, "fine"); + nonDefault.put(DistributionConfig.GROUPS_NAME, "red, blue"); + nonDefault.put(DistributionConfig.MEMBER_TIMEOUT_NAME, "2000"); nonDefault.put(DistributionConfig.LOCATORS_NAME, localHost.getHostName()+'['+port+']'); DistributionConfigImpl config = new DistributionConfigImpl(nonDefault); RemoteTransportConfig transport = new RemoteTransportConfig(config, @@ -222,7 +242,38 @@ public class MembershipJUnitTest { } } } - + + System.out.println("testing multicast availability"); + assertTrue(m1.testMulticast()); + + System.out.println("multicasting SerialAckedMessage from m1 to m2"); + SerialAckedMessage msg = new SerialAckedMessage(); + msg.setRecipient(m2.getLocalMember()); + msg.setMulticast(true); + m1.send(new InternalDistributedMember[] {m2.getLocalMember()}, msg, null); + giveUp = System.currentTimeMillis() + 5000; + boolean verified = false; + Throwable problem = null; + while (giveUp > System.currentTimeMillis()) { + try { + verify(listener2).messageReceived(isA(SerialAckedMessage.class)); + verified = true; + break; + } catch (Error e) { + problem = e; + Thread.sleep(500); + } + } + if (!verified) { + if (problem != null) { + problem.printStackTrace(); + } + fail("Expected a multicast message to be received"); + } + + // let the managers idle for a while and get used to each other + Thread.sleep(4000l); + m2.shutdown(); assertTrue(!m2.isConnected()); @@ -284,5 +335,64 @@ public class MembershipJUnitTest { assertEquals(600+4, str.length()); } + @Test + public void testMessagesThrowExceptionIfProcessed() throws Exception { + DistributionManager dm = null; + try { + new HeartbeatMessage().process(dm); + fail("expected an exception to be thrown"); + } catch (Exception e) { + // okay + } + try { + new HeartbeatRequestMessage().process(dm); + fail("expected an exception to be thrown"); + } catch (Exception e) { + // okay + } + try { + new InstallViewMessage().process(dm); + fail("expected an exception to be thrown"); + } catch (Exception e) { + // okay + } + try { + new JoinRequestMessage().process(dm); + fail("expected an exception to be thrown"); + } catch (Exception e) { + // okay + } + try { + new JoinResponseMessage().process(dm); + fail("expected an exception to be thrown"); + } catch (Exception e) { + // okay + } + try { + new LeaveRequestMessage().process(dm); + fail("expected an exception to be thrown"); + } catch (Exception e) { + // okay + } + try { + new RemoveMemberMessage().process(dm); + fail("expected an exception to be thrown"); + } catch (Exception e) { + // okay + } + try { + new SuspectMembersMessage().process(dm); + fail("expected an exception to be thrown"); + } catch (Exception e) { + // okay + } + try { + new ViewAckMessage().process(dm); + fail("expected an exception to be thrown"); + } catch (Exception e) { + // okay + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bd43c341/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/MembershipManagerHelper.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/MembershipManagerHelper.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/MembershipManagerHelper.java index c5141de..f764ef9 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/MembershipManagerHelper.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/MembershipManagerHelper.java @@ -156,6 +156,7 @@ public class MembershipManagerHelper public static void crashDistributedSystem(final DistributedSystem msys) { msys.getLogWriter().info("crashing distributed system: " + msys); MembershipManagerHelper.inhibitForcedDisconnectLogging(true); + MembershipManagerHelper.beSickMember(msys); MembershipManagerHelper.playDead(msys); GMSMembershipManager mgr = ((GMSMembershipManager)getMembershipManager(msys)); mgr.forceDisconnect("for testing");
