GEODE-1372 added unit test and some more fixes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/c5247cce Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/c5247cce Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/c5247cce Branch: refs/heads/feature/GEODE-420 Commit: c5247cce308b04c31d8c3eaa8e0cf86e54bf02d7 Parents: 6eeeead Author: Hitesh Khamesra <[email protected]> Authored: Wed Jun 1 15:27:24 2016 -0700 Committer: Hitesh Khamesra <[email protected]> Committed: Mon Aug 29 10:39:18 2016 -0700 ---------------------------------------------------------------------- .../internal/membership/NetView.java | 4 +- .../membership/gms/interfaces/Messenger.java | 12 + .../gms/locator/FindCoordinatorRequest.java | 22 +- .../gms/locator/FindCoordinatorResponse.java | 70 ++- .../membership/gms/locator/GMSLocator.java | 14 +- .../membership/gms/membership/GMSJoinLeave.java | 151 ++++-- .../gms/messages/InstallViewMessage.java | 25 + .../gms/messages/JoinRequestMessage.java | 49 +- .../gms/messages/JoinResponseMessage.java | 63 ++- .../membership/gms/messenger/GMSEncrypt.java | 139 +++--- .../gms/messenger/JGroupsMessenger.java | 486 ++++++++++++++----- .../internal/InternalDataSerializer.java | 18 + .../DistributedMulticastRegionDUnitTest.java | 48 +- .../gemfire/distributed/LocatorDUnitTest.java | 30 +- .../LocatorUDPSecurityDUnitTest.java | 28 ++ .../gms/membership/GMSJoinLeaveJUnitTest.java | 43 +- .../gms/messenger/GMSEncryptJUnitTest.java | 6 +- .../messenger/JGroupsMessengerJUnitTest.java | 201 +++++++- 18 files changed, 1099 insertions(+), 310 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java index d8c95c2..7d9a84c 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/NetView.java @@ -579,7 +579,7 @@ public class NetView implements DataSerializableFixedID { InternalDataSerializer.writeSet(crashedMembers, out); DataSerializer.writeIntArray(failureDetectionPorts, out); // TODO expensive serialization - DataSerializer.writeObject(publicKeys, out); + DataSerializer.writeHashMap(publicKeys, out); } @Override @@ -592,7 +592,7 @@ public class NetView implements DataSerializableFixedID { shutdownMembers = InternalDataSerializer.readHashSet(in); crashedMembers = InternalDataSerializer.readHashSet(in); failureDetectionPorts = DataSerializer.readIntArray(in); - publicKeys = DataSerializer.readObject(in); + publicKeys = DataSerializer.readHashMap(in); } /** this will deserialize as an ArrayList */ http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java index e10f325..3e9a2dc 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java @@ -86,4 +86,16 @@ public interface Messenger extends Service { * @param state the state of that member's outgoing messaging to this member */ void waitForMessageState(InternalDistributedMember member, Map state) throws InterruptedException; + + byte[] getPublickey(InternalDistributedMember mbr); + + void setPublicKey(byte[] publickey, InternalDistributedMember mbr); + + void setClusterSecretKey(byte[] clusterSecretKey); + + byte[] getClusterSecretKey(); + + int getRequestId(); + + void initClusterKey(); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java index 5c0a1d1..c434c25 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorRequest.java @@ -27,6 +27,7 @@ import com.gemstone.gemfire.distributed.internal.DistributionManager; import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; import com.gemstone.gemfire.internal.DataSerializableFixedID; +import com.gemstone.gemfire.internal.InternalDataSerializer; import com.gemstone.gemfire.internal.Version; public class FindCoordinatorRequest extends HighPriorityDistributionMessage @@ -35,15 +36,20 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage private InternalDistributedMember memberID; private Collection<InternalDistributedMember> rejectedCoordinators; private int lastViewId; - + private byte[] myPublicKey; + private int requestId; + public FindCoordinatorRequest(InternalDistributedMember myId) { this.memberID = myId; } - public FindCoordinatorRequest(InternalDistributedMember myId, Collection<InternalDistributedMember> rejectedCoordinators, int lastViewId) { + public FindCoordinatorRequest(InternalDistributedMember myId, Collection<InternalDistributedMember> rejectedCoordinators, + int lastViewId, byte[] pk, int requestId) { this.memberID = myId; this.rejectedCoordinators = rejectedCoordinators; this.lastViewId = lastViewId; + this.myPublicKey = pk; + this.requestId = requestId; } public FindCoordinatorRequest() { @@ -54,6 +60,10 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage return memberID; } + public byte[] getMyPublicKey() { + return myPublicKey; + } + public Collection<InternalDistributedMember> getRejectedCoordinators() { return rejectedCoordinators; } @@ -81,6 +91,10 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage public int getDSFID() { return FIND_COORDINATOR_REQ; } + + public int getRequestId() { + return requestId; + } @Override public void toData(DataOutput out) throws IOException { @@ -94,6 +108,8 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage out.writeInt(0); } out.writeInt(lastViewId); + out.writeInt(requestId); + InternalDataSerializer.writeByteArray(this.myPublicKey, out); } @Override @@ -105,6 +121,8 @@ public class FindCoordinatorRequest extends HighPriorityDistributionMessage this.rejectedCoordinators.add((InternalDistributedMember)DataSerializer.readObject(in)); } this.lastViewId = in.readInt(); + this.requestId = in.readInt(); + this.myPublicKey = InternalDataSerializer.readByteArray(in); } @Override http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java index 0427cb4..07f0e58 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/FindCoordinatorResponse.java @@ -19,6 +19,7 @@ package com.gemstone.gemfire.distributed.internal.membership.gms.locator; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -42,12 +43,14 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage private boolean networkPartitionDetectionEnabled; private boolean usePreferredCoordinators; private boolean isShortForm; - + private byte[] coordinatorPublicKey; + + private int requestId; public FindCoordinatorResponse(InternalDistributedMember coordinator, InternalDistributedMember senderId, boolean fromView, NetView view, HashSet<InternalDistributedMember> registrants, - boolean networkPartitionDectionEnabled, boolean usePreferredCoordinators) { + boolean networkPartitionDectionEnabled, boolean usePreferredCoordinators, byte[] pk) { this.coordinator = coordinator; this.senderId = senderId; this.fromView = fromView; @@ -56,19 +59,30 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage this.networkPartitionDetectionEnabled = networkPartitionDectionEnabled; this.usePreferredCoordinators = usePreferredCoordinators; this.isShortForm = false; + this.coordinatorPublicKey = pk; } public FindCoordinatorResponse(InternalDistributedMember coordinator, - InternalDistributedMember senderId) { + InternalDistributedMember senderId, byte[] pk, int requestId) { this.coordinator = coordinator; this.senderId = senderId; this.isShortForm = true; + this.coordinatorPublicKey = pk; + this.requestId = requestId; } public FindCoordinatorResponse() { // no-arg constructor for serialization } + public byte[] getCoordinatorPublicKey() { + return coordinatorPublicKey; + } + + public int getRequestId() { + return requestId; + } + public boolean isNetworkPartitionDetectionEnabled() { return networkPartitionDetectionEnabled; } @@ -131,6 +145,7 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage public void toData(DataOutput out) throws IOException { DataSerializer.writeObject(coordinator, out); DataSerializer.writeObject(senderId, out); + InternalDataSerializer.writeByteArray(coordinatorPublicKey, out); out.writeBoolean(isShortForm); out.writeBoolean(fromView); out.writeBoolean(networkPartitionDetectionEnabled); @@ -143,7 +158,8 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage public void fromData(DataInput in) throws IOException, ClassNotFoundException { coordinator = DataSerializer.readObject(in); senderId = DataSerializer.readObject(in); - isShortForm = in.readBoolean(); + coordinatorPublicKey = InternalDataSerializer.readByteArray(in); + isShortForm = in.readBoolean(); if (!isShortForm) { fromView = in.readBoolean(); networkPartitionDetectionEnabled = in.readBoolean(); @@ -158,4 +174,50 @@ public class FindCoordinatorResponse extends HighPriorityDistributionMessage throw new IllegalStateException("this message should not be executed"); } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + FindCoordinatorResponse other = (FindCoordinatorResponse) obj; + if (coordinator == null) { + if (other.coordinator != null) + return false; + } else if (!coordinator.equals(other.coordinator)) + return false; + if (!Arrays.equals(coordinatorPublicKey, other.coordinatorPublicKey)) + return false; + if (fromView != other.fromView) + return false; + if (isShortForm != other.isShortForm) + return false; + if (networkPartitionDetectionEnabled != other.networkPartitionDetectionEnabled) + return false; + if (registrants == null) { + if (other.registrants != null) + return false; + } else if (!registrants.equals(other.registrants)) + return false; + //as we are not sending requestId as part of FinDCoordinator resposne + /*if (requestId != other.requestId) + return false;*/ + if (senderId == null) { + if (other.senderId != null) + return false; + } else if (!senderId.equals(other.senderId)) + return false; + if (usePreferredCoordinators != other.usePreferredCoordinators) + return false; + if (view == null) { + if (other.view != null) + return false; + } else if (!view.equals(other.view)) + return false; + return true; + } + + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java index 1065214..305b497 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/locator/GMSLocator.java @@ -167,7 +167,7 @@ public class GMSLocator implements Locator, NetLocator { } } else if (request instanceof FindCoordinatorRequest) { FindCoordinatorRequest findRequest = (FindCoordinatorRequest)request; - + services.getMessenger().setPublicKey(findRequest.getMyPublicKey(), findRequest.getMemberID()); if (findRequest.getMemberID() != null) { InternalDistributedMember coord = null; @@ -227,9 +227,17 @@ public class GMSLocator implements Locator, NetLocator { } synchronized(registrants) { + byte[] coordPk = null; + if(view != null) { + coordPk = (byte[])view.getPublicKey(coord); + } + if (coordPk == null) { + coordPk = services.getMessenger().getPublickey(coord); + } response = new FindCoordinatorResponse(coord, localAddress, - fromView, view, new HashSet<>(registrants), - this.networkPartitionDetectionEnabled, this.usePreferredCoordinators); + fromView, view, new HashSet<InternalDistributedMember>(registrants), + this.networkPartitionDetectionEnabled, this.usePreferredCoordinators, + coordPk); } } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java index 3875d38..2ce1058 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java @@ -38,6 +38,7 @@ import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient; import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.security.AuthenticationFailedException; + import org.apache.logging.log4j.Logger; import java.io.IOException; @@ -362,8 +363,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } else { logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress); int port = services.getHealthMonitor().getFailureDetectionPort(); - JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord), port); - services.getMessenger().send(req, state.view); + JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord), port, + services.getMessenger().getRequestId()); + //services.getMessenger().send(req, state.view); + services.getMessenger().send(req); } JoinResponseMessage response; @@ -415,6 +418,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { NetView v = response.getCurrentView(); InternalDistributedMember coord = v.getCoordinator(); if (searchState.alreadyTried.contains(coord)) { + searchState.view = response.getCurrentView(); // we already sent join request to it..so lets wait some more time here // assuming we got this response immediately, so wait for same timeout here.. long timeout = Math.max(services.getConfig().getMemberTimeout(), services.getConfig().getJoinTimeout() / 5); @@ -422,7 +426,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { response = joinResponse[0]; } else { // try on this coordinator - searchState.possibleCoordinator = coord; + searchState.view = response.getCurrentView(); response = null; } searchState.view = v; @@ -468,7 +472,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (incomingRequest.getMemberID().getVersionObject().compareTo(Version.CURRENT) < 0) { logger.warn("detected an attempt to start a peer using an older version of the product {}", incomingRequest.getMemberID()); - JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version"); + JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version", incomingRequest.getRequestId()); m.setRecipient(incomingRequest.getMemberID()); services.getMessenger().send(m); return; @@ -481,7 +485,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { rejection = e.getMessage(); } if (rejection != null && rejection.length() > 0) { - JoinResponseMessage m = new JoinResponseMessage(rejection); + JoinResponseMessage m = new JoinResponseMessage(rejection, 0); m.setRecipient(incomingRequest.getMemberID()); services.getMessenger().send(m); return; @@ -633,13 +637,21 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { logger.debug("Recording the request to be processed in the next membership view"); synchronized (viewRequests) { viewRequests.add(request); + if (viewCreator != null) { + boolean joinResponseSent = viewCreator.informToPendingJoinRequests(); + + if (!joinResponseSent && request instanceof JoinRequestMessage) { + JoinRequestMessage jreq = (JoinRequestMessage) request; + // this will inform about cluster-secret key, as we have authenticated at this point + JoinResponseMessage response = new JoinResponseMessage(jreq.getSender(), services.getMessenger().getClusterSecretKey(), jreq.getRequestId()); + services.getMessenger().send(response); + } + } viewRequests.notifyAll(); } - if (viewCreator != null) { - viewCreator.informToPendingJoinRequests(); - } + } - + // for testing purposes, returns a copy of the view requests for verification List<DistributionMessage> getViewRequests() { synchronized (viewRequests) { @@ -711,6 +723,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (newView != null) { viewCreator.setInitialView(newView, newView.getNewMembers(), newView.getShutdownMembers(), newView.getCrashedMembers()); } + services.getMessenger().initClusterKey(); viewCreator.setDaemon(true); logger.info("ViewCreator starting on:" + localAddress); viewCreator.start(); @@ -753,7 +766,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { private void sendJoinResponses(NetView newView, List<InternalDistributedMember> newMbrs) { for (InternalDistributedMember mbr : newMbrs) { - JoinResponseMessage response = new JoinResponseMessage(mbr, newView); + JoinResponseMessage response = new JoinResponseMessage(mbr, newView, 0); services.getMessenger().send(response); } } @@ -771,14 +784,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } - private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) { - for (InternalDistributedMember mbr : newMbrs) { - JoinResponseMessage response = new JoinResponseMessage(mbr, newView); - services.getMessenger().send(response); - } - } - - boolean prepareView(NetView view, List<InternalDistributedMember> newMembers) throws InterruptedException { return sendView(view, true, this.prepareProcessor); } @@ -841,7 +846,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { pendingRemovals.removeAll(view.getCrashedMembers()); viewReplyProcessor.initialize(id, responders); viewReplyProcessor.processPendingRequests(pendingLeaves, pendingRemovals); - services.getMessenger().send(msg, view); + addPublickeysToView(view); + services.getMessenger().send(msg); // only wait for responses during preparation if (preparing) { @@ -868,13 +874,32 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return true; } + private void addPublickeysToView(NetView view) { + //TODO: is this check is correct + if (services != null && services.getConfig() != null && services.getConfig().getDistributionConfig() != null) { + String sDHAlgo = services.getConfig().getDistributionConfig().getSecurityClientDHAlgo(); + if (sDHAlgo != null && !sDHAlgo.isEmpty()) { + List<InternalDistributedMember> mbrs = view.getMembers(); + Iterator<InternalDistributedMember> itr = mbrs.iterator(); + + while (itr.hasNext()) { + InternalDistributedMember mbr = itr.next(); + byte[] pk = services.getMessenger().getPublickey(mbr); + view.setPublicKey(mbr, pk); + } + } + } + } private void processViewMessage(final InstallViewMessage m) { NetView view = m.getView(); if(currentView != null && !currentView.contains(m.getSender())) { - logger.info("Ignoring the view {} from member {}, which is not in my current view {} ", view, m.getSender(), currentView); - return; + if(this.preparedView == null || !this.preparedView.contains(m.getSender())) + { + logger.info("Ignoring the view {} from member {}, which is not in my current view {} ", view, m.getSender(), currentView); + return; + } } if (currentView != null && view.getViewId() < currentView.getViewId()) { @@ -927,7 +952,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { private void ackView(InstallViewMessage m) { if (!playingDead && m.getView().contains(m.getView().getCreator())) { - services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()), m.getView()); + services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing())); } } @@ -968,7 +993,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return findCoordinatorFromView(); } - FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId); + FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress, state.alreadyTried, state.viewId, + services.getMessenger().getPublickey(localAddress), services.getMessenger().getRequestId()); Set<InternalDistributedMember> possibleCoordinators = new HashSet<InternalDistributedMember>(); Set<InternalDistributedMember> coordinatorsWithView = new HashSet<InternalDistributedMember>(); @@ -988,6 +1014,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { Object o = tcpClientWrapper.sendCoordinatorFindRequest(addr, request, connectTimeout); FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse) o : null; if (response != null) { + setCoordinatorPublicKey(response); state.locatorsContacted++; if (!state.hasContactedAJoinedLocator && response.getSenderId() != null && response.getSenderId().getVmViewId() >= 0) { @@ -1084,16 +1111,35 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (state.registrants != null) { recipients.addAll(state.registrants); } - recipients.remove(localAddress); - FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId); - req.setRecipients(v.getMembers()); + recipients.remove(localAddress); + // FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey( + // localAddress), services.getMessenger().getRequestId()); + //req.setRecipients(v.getMembers()); + boolean testing = unitTesting.contains("findCoordinatorFromView"); synchronized (state.responses) { if (!testing) { state.responses.clear(); } - services.getMessenger().send(req); + + if (!services.getConfig().getDistributionConfig().getSecurityClientDHAlgo().isEmpty()) { + for (InternalDistributedMember mbr : v.getMembers()) { + Set<InternalDistributedMember> r = new HashSet<>(); + r.add(mbr); + FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey( + localAddress), services.getMessenger().getRequestId()); + req.setRecipients(r); + + services.getMessenger().send(req, v); + } + } else { + FindCoordinatorRequest req = new FindCoordinatorRequest(localAddress, state.alreadyTried, state.viewId, services.getMessenger().getPublickey( + localAddress), services.getMessenger().getRequestId()); + req.setRecipients(v.getMembers()); + + services.getMessenger().send(req, v); + } try { if (!testing) { state.responses.wait(DISCOVERY_TIMEOUT); @@ -1144,8 +1190,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { private void processJoinResponse(JoinResponseMessage rsp) { synchronized (joinResponse) { if (!this.isJoined) { - joinResponse[0] = rsp; - joinResponse.notifyAll(); + //1. our joinRequest rejected. + //2. Member which was coordinator but just now some other member became coordinator + //3. we got message with secret key, but still view is coming and that will inform the joining thread + if (rsp.getRejectionMessage() != null || rsp.getCurrentView() != null) { + joinResponse[0] = rsp; + joinResponse.notifyAll(); + } else { + //we got secret key lets add it + services.getMessenger().setClusterSecretKey(rsp.getSecretPk()); + } } } } @@ -1170,9 +1224,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { FindCoordinatorResponse resp; if (this.isJoined) { NetView v = currentView; - resp = new FindCoordinatorResponse(v.getCoordinator(), localAddress); + resp = new FindCoordinatorResponse(v.getCoordinator(), localAddress, + services.getMessenger().getPublickey(v.getCoordinator()), req.getRequestId()); } else { - resp = new FindCoordinatorResponse(localAddress, localAddress); + resp = new FindCoordinatorResponse(localAddress, localAddress, + services.getMessenger().getPublickey(localAddress), req.getRequestId()); } resp.setRecipient(req.getMemberID()); services.getMessenger().send(resp); @@ -1183,6 +1239,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { searchState.responses.add(resp); searchState.responses.notifyAll(); } + setCoordinatorPublicKey(resp); + } + + private void setCoordinatorPublicKey(FindCoordinatorResponse response) { + if (response.getCoordinator() != null && response.getCoordinatorPublicKey() != null) + services.getMessenger().setPublicKey(response.getCoordinatorPublicKey(), response.getCoordinator()); } private void processNetworkPartitionMessage(NetworkPartitionMessage msg) { @@ -1993,36 +2055,40 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { } } - synchronized void informToPendingJoinRequests() { + synchronized boolean informToPendingJoinRequests() { + boolean joinResponseSent = false; if (!shutdown) { - return; + return joinResponseSent; } - ArrayList<DistributionMessage> requests = new ArrayList<>(); synchronized (viewRequests) { if (viewRequests.size() > 0) { requests.addAll(viewRequests); } else { - return; + return joinResponseSent; } viewRequests.clear(); } + for (DistributionMessage msg : requests) { switch (msg.getDSFID()) { case JOIN_REQUEST: - logger.info("Informing to pending join requests {}", msg); - + NetView v = currentView; + logger.info("Informing to pending join requests {} myid {} coord {}", msg, localAddress, v.getCoordinator()); if (!v.getCoordinator().equals(localAddress)) { + joinResponseSent = true; //lets inform that coordinator has been changed - JoinResponseMessage jrm = new JoinResponseMessage(((JoinRequestMessage) msg).getMemberID(), v); + JoinResponseMessage jrm = new JoinResponseMessage(((JoinRequestMessage) msg).getMemberID(), v, ((JoinRequestMessage) msg).getRequestId()); services.getMessenger().send(jrm); } default: break; } } + + return joinResponseSent; } /** @@ -2033,7 +2099,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { void createAndSendView(List<DistributionMessage> requests) throws InterruptedException { List<InternalDistributedMember> joinReqs = new ArrayList<>(10); Map<InternalDistributedMember, Integer> joinPorts = new HashMap<>(10); - Map<InternalDistributedMember, Object> joinKeys = new HashMap<>(10); Set<InternalDistributedMember> leaveReqs = new HashSet<>(10); List<InternalDistributedMember> removalReqs = new ArrayList<>(10); List<String> removalReasons = new ArrayList<String>(10); @@ -2067,7 +2132,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { if (!joinReqs.contains(mbr)) { joinReqs.add(mbr); joinPorts.put(mbr, port); - joinKeys.put(mbr, jmsg.getPublicKey()); } break; case LEAVE_REQUEST_MESSAGE: @@ -2135,7 +2199,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { for (InternalDistributedMember mbr : joinReqs) { if (mbrs.contains(mbr)) { newView.setFailureDetectionPort(mbr, joinPorts.get(mbr)); - newView.setPublicKey(mbr, joinKeys.get(mbr)); } } if (currentView != null) { @@ -2159,7 +2222,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { return; } - sendJoinResponses(joinReqs, newView); + //we already sent whrn we got join request + //sendJoinResponses(newView, joinReqs); // send removal messages before installing the view so we stop // getting messages from members that have been kicked out @@ -2287,7 +2351,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler { // we also send a join response so that information like the multicast message digest // can be transmitted to the new members w/o including it in the view message - sendJoinResponses(newView, joinReqs); + //we already sent whrn we got join request + //sendJoinResponses(newView, joinReqs); if (markViewCreatorForShutdown && getViewCreator() != null) { shutdown = true; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java index c41584f..224fef1 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/InstallViewMessage.java @@ -110,4 +110,29 @@ public class InstallViewMessage extends HighPriorityDistributionMessage { +")"; } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + InstallViewMessage other = (InstallViewMessage) obj; + if (credentials == null) { + if (other.credentials != null) + return false; + } else if (!credentials.equals(other.credentials)) + return false; + if (kind != other.kind) + return false; + if (previousViewId != other.previousViewId) + return false; + if (view == null) { + if (other.view != null) + return false; + } else if (!view.equals(other.view)) + return false; + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java index 5545935..b282daa 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinRequestMessage.java @@ -30,25 +30,25 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage { private InternalDistributedMember memberID; private Object credentials; private int failureDetectionPort = -1; - private Object publicKey; - + private int requestId; + public JoinRequestMessage(InternalDistributedMember coord, - InternalDistributedMember id, Object credentials, int fdPort) { + InternalDistributedMember id, Object credentials, int fdPort, int requestId) { super(); setRecipient(coord); this.memberID = id; this.credentials = credentials; - this.publicKey = null; this.failureDetectionPort = fdPort; + this.requestId = requestId; } public JoinRequestMessage() { // no-arg constructor for serialization } - public void setPublicKey(Object key) { - this.publicKey = key; + public int getRequestId() { + return requestId; } - + @Override public int getDSFID() { return JOIN_REQUEST; @@ -67,10 +67,6 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage { return credentials; } - public Object getPublicKey() { - return publicKey; - } - @Override public String toString() { return getShortClassName() + "(" + memberID + (credentials==null? ")" : "; with credentials)") + " failureDetectionPort:" + failureDetectionPort; @@ -85,24 +81,49 @@ public class JoinRequestMessage extends HighPriorityDistributionMessage { public void toData(DataOutput out) throws IOException { DataSerializer.writeObject(memberID, out); DataSerializer.writeObject(credentials, out); - DataSerializer.writeObject(publicKey, out); DataSerializer.writePrimitiveInt(failureDetectionPort, out); // preserve the multicast setting so the receiver can tell // if this is a mcast join request out.writeBoolean(getMulticast()); + out.writeInt(requestId); } @Override public void fromData(DataInput in) throws IOException, ClassNotFoundException { memberID = DataSerializer.readObject(in); credentials = DataSerializer.readObject(in); - publicKey = DataSerializer.readObject(in); failureDetectionPort = DataSerializer.readPrimitiveInt(in); setMulticast(in.readBoolean()); + requestId = in.readInt(); } public int getFailureDetectionPort() { return failureDetectionPort; } - + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + JoinRequestMessage other = (JoinRequestMessage) obj; + if (credentials == null) { + if (other.credentials != null) + return false; + } else if (!credentials.equals(other.credentials)) + return false; + if (failureDetectionPort != other.failureDetectionPort) + return false; + if (memberID == null) { + if (other.memberID != null) + return false; + } else if (!memberID.equals(other.memberID)) + return false; + if (requestId != other.requestId) + return false; + return true; + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java index ad9c319..ff20a4e 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messages/JoinResponseMessage.java @@ -20,6 +20,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -37,20 +38,39 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage { private String rejectionMessage; private InternalDistributedMember memberID; private byte[] messengerData; - - public JoinResponseMessage(InternalDistributedMember memberID, NetView view) { + private int requestId; + private byte[] secretPk; + + public JoinResponseMessage(InternalDistributedMember memberID, NetView view, int requestId) { this.currentView = view; this.memberID = memberID; + this.requestId = requestId; setRecipient(memberID); } - public JoinResponseMessage(String rejectionMessage) { + public JoinResponseMessage(InternalDistributedMember memberID, byte[] sPk, int requestId) { + this.memberID = memberID; + this.requestId = requestId; + this.secretPk = sPk; + setRecipient(memberID); + } + + public JoinResponseMessage(String rejectionMessage, int requestId) { this.rejectionMessage = rejectionMessage; + this.requestId = requestId; } public JoinResponseMessage() { // no-arg constructor for serialization } + + public byte[] getSecretPk() { + return secretPk; + } + + public int getRequestId() { + return requestId; + } public NetView getCurrentView() { return currentView; @@ -101,6 +121,7 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage { DataSerializer.writeObject(memberID, out); DataSerializer.writeString(rejectionMessage, out); DataSerializer.writeByteArray(messengerData, out); + DataSerializer.writeByteArray(secretPk, out); } @Override @@ -109,6 +130,42 @@ public class JoinResponseMessage extends HighPriorityDistributionMessage { memberID = DataSerializer.readObject(in); rejectionMessage = DataSerializer.readString(in); messengerData = DataSerializer.readByteArray(in); + secretPk = DataSerializer.readByteArray(in); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + JoinResponseMessage other = (JoinResponseMessage) obj; + if (currentView == null) { + if (other.currentView != null) + return false; + } else if (!currentView.equals(other.currentView)) + return false; + if (memberID == null) { + if (other.memberID != null) + return false; + } else if (!memberID.equals(other.memberID)) + return false; + if (!Arrays.equals(messengerData, other.messengerData)) + return false; + if (rejectionMessage == null) { + if (other.rejectionMessage != null) + return false; + } else if (!rejectionMessage.equals(other.rejectionMessage)) + return false; + //as we are not sending as part of JoinResposne + /*if (requestId != other.requestId) + return false;*/ + if (!Arrays.equals(secretPk, other.secretPk)) + return false; + return true; } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java index 3d3633d..0bea614 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/GMSEncrypt.java @@ -23,7 +23,6 @@ import java.security.spec.X509EncodedKeySpec; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.crypto.Cipher; @@ -37,18 +36,14 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM import com.gemstone.gemfire.distributed.internal.membership.NetView; import com.gemstone.gemfire.distributed.internal.membership.gms.Services; -import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.internal.logging.LogService; public class GMSEncrypt implements Cloneable{ public static long encodingsPerformed; public static long decodingsPerformed; - private static final Logger logger = LogService.getLogger(); - // Parameters for the Diffie-Hellman key exchange private static final BigInteger dhP = new BigInteger( "13528702063991073999718992897071702177131142188276542919088770094024269" @@ -82,28 +77,27 @@ public class GMSEncrypt implements Cloneable{ private ClusterEncryptor clusterEncryptor; - - protected void installView(NetView view) throws Exception { + protected void installView(NetView view) { this.view = view; this.view.setPublicKey(services.getJoinLeave().getMemberID(), getPublicKeyBytes()); } - - protected void installView(NetView view, InternalDistributedMember mbr) throws Exception { + + protected void installView(NetView view, InternalDistributedMember mbr) { this.view = view; - // this.view.setPublicKey(mbr, getPublicKeyBytes()); - // TODO remove ciphers for departed members - //addClusterKey(); } - - protected byte[] getSecretBytes() { + + protected byte[] getClusterSecretKey() { return this.clusterEncryptor.secretBytes; } - - protected synchronized void addClusterKey() throws Exception { - this.clusterEncryptor = new ClusterEncryptor(this); + + protected synchronized void initClusterSecretKey() throws Exception { + if(this.clusterEncryptor == null) { + this.clusterEncryptor = new ClusterEncryptor(this); + } } - - protected synchronized void addClusterKey(byte[] secretBytes) throws Exception { + + protected synchronized void addClusterKey(byte[] secretBytes) { + //TODO we are reseeting here, in case there is some race this.clusterEncryptor = new ClusterEncryptor(secretBytes); } @@ -134,6 +128,11 @@ public class GMSEncrypt implements Cloneable{ public byte[] decryptData(byte[] data) throws Exception { return this.clusterEncryptor.decryptBytes(data); } + + public byte[] decryptData(byte[] data, byte[] pkBytes) throws Exception { + PeerEncryptor encryptor = new PeerEncryptor(pkBytes); + return encryptor.decryptBytes(data); + } public byte[] encryptData(byte[] data) throws Exception { return this.clusterEncryptor.encryptBytes(data); @@ -142,6 +141,26 @@ public class GMSEncrypt implements Cloneable{ protected byte[] getPublicKeyBytes() { return dhPublicKey.getEncoded(); } + + protected byte[] getPublicKey(InternalDistributedMember member) { + try { + InternalDistributedMember localMbr = services.getMessenger().getMemberID(); + if (localMbr != null && localMbr.equals(member)) { + return this.dhPublicKey.getEncoded();// local one + } + return getPeerEncryptor(member).peerPublicKey.getEncoded(); + } catch (Exception e) { + throw new RuntimeException("Not found public key for member " + member, e); + } + } + + protected void setPublicKey(byte[] publickey, InternalDistributedMember mbr) { + try { + createPeerEncryptor(mbr, publickey); + }catch(Exception e) { + throw new RuntimeException("Unable to create peer encryptor " + mbr, e); + } + } @Override protected GMSEncrypt clone() throws CloneNotSupportedException { @@ -152,7 +171,6 @@ public class GMSEncrypt implements Cloneable{ X509EncodedKeySpec x509KeySpec = new X509EncodedKeySpec(this.dhPublicKey.getEncoded()); KeyFactory keyFact = KeyFactory.getInstance("DH"); - // PublicKey pubKey = keyFact.generatePublic(x509KeySpec); gmsEncrypt.dhPublicKey = keyFact.generatePublic(x509KeySpec); final String format = this.dhPrivateKey.getFormat(); System.out.println("private key format " + format); @@ -190,16 +208,20 @@ public class GMSEncrypt implements Cloneable{ } } - protected synchronized PeerEncryptor getPeerEncryptor(InternalDistributedMember member) throws Exception{ + protected PeerEncryptor getPeerEncryptor(InternalDistributedMember member) throws Exception { PeerEncryptor result = memberToPeerEncryptor.get(member); if (result == null) { - result = createPeerEncryptor(member); + synchronized (this) { + result = memberToPeerEncryptor.get(member); + if (result == null) { + result = createPeerEncryptor(member, (byte[]) view.getPublicKey(member)); + } + } } return result; } - private PeerEncryptor createPeerEncryptor(InternalDistributedMember member) throws Exception { - byte[] peerKeyBytes = (byte[]) view.getPublicKey(member); + private PeerEncryptor createPeerEncryptor(InternalDistributedMember member, byte[] peerKeyBytes) throws Exception { PeerEncryptor result = new PeerEncryptor(peerKeyBytes); memberToPeerEncryptor.put(member, result); return result; @@ -294,7 +316,7 @@ public class GMSEncrypt implements Cloneable{ this.peerPublicKey = getPublicKey(peerPublicKeyBytes); } - public byte [] encryptBytes(byte[] data) throws Exception { + public synchronized byte[] encryptBytes(byte[] data) throws Exception { String algo = null; if (this.peerSKAlgo != null) { algo = this.peerSKAlgo; @@ -315,9 +337,8 @@ public class GMSEncrypt implements Cloneable{ } return encrypt; } - - public byte[] decryptBytes(byte[] data) throws Exception - { + + public synchronized byte[] decryptBytes(byte[] data) throws Exception { String algo = null; if (this.peerSKAlgo != null) { algo = this.peerSKAlgo; @@ -369,29 +390,27 @@ public class GMSEncrypt implements Cloneable{ return encrypt; } - protected static Cipher getEncryptCipher(String dhSKAlgo, byte[] secretBytes) - throws Exception{ - - Cipher encrypt = null; + protected static Cipher getEncryptCipher(String dhSKAlgo, byte[] secretBytes) throws Exception { - int keysize = getKeySize(dhSKAlgo); - int blocksize = getBlockSize(dhSKAlgo); + Cipher encrypt = null; - if (keysize == -1 || blocksize == -1) { - //TODO how should we do here - /*SecretKey sKey = ka.generateSecret(dhSKAlgo); - encrypt = Cipher.getInstance(dhSKAlgo); - encrypt.init(Cipher.ENCRYPT_MODE, sKey);*/ - } - else { - String dhAlgoStr = getDhAlgoStr(dhSKAlgo); + int keysize = getKeySize(dhSKAlgo); + int blocksize = getBlockSize(dhSKAlgo); - SecretKeySpec sks = new SecretKeySpec(secretBytes, 0, keysize, dhAlgoStr); - IvParameterSpec ivps = new IvParameterSpec(secretBytes, keysize, blocksize); + if (keysize == -1 || blocksize == -1) { + // TODO how should we do here, should we just throw runtime exception? + /* SecretKey sKey = ka.generateSecret(dhSKAlgo); + * encrypt = Cipher.getInstance(dhSKAlgo); + * encrypt.init(Cipher.ENCRYPT_MODE, sKey); */ + } else { - encrypt = Cipher.getInstance(dhAlgoStr + "/CBC/PKCS5Padding"); - encrypt.init(Cipher.ENCRYPT_MODE, sks, ivps); - } + String dhAlgoStr = getDhAlgoStr(dhSKAlgo); + SecretKeySpec sks = new SecretKeySpec(secretBytes, 0, keysize, dhAlgoStr); + IvParameterSpec ivps = new IvParameterSpec(secretBytes, keysize, blocksize); + + encrypt = Cipher.getInstance(dhAlgoStr + "/CBC/PKCS5Padding"); + encrypt.init(Cipher.ENCRYPT_MODE, sks, ivps); + } return encrypt; } @@ -430,10 +449,10 @@ public class GMSEncrypt implements Cloneable{ int blocksize = getBlockSize(dhSKAlgo); if (keysize == -1 || blocksize == -1) { - //TODO: how to do here - /*SecretKey sKey = ka.generateSecret(dhSKAlgo); - decrypt = Cipher.getInstance(dhSKAlgo); - decrypt.init(Cipher.DECRYPT_MODE, sKey);*/ + // TODO: how to do here, should we just throw runtime exception? + /* SecretKey sKey = ka.generateSecret(dhSKAlgo); + * decrypt = Cipher.getInstance(dhSKAlgo); + * decrypt.init(Cipher.DECRYPT_MODE, sKey); */ } else { String algoStr = getDhAlgoStr(dhSKAlgo); @@ -457,8 +476,6 @@ public class GMSEncrypt implements Cloneable{ SecretKey sKey = ka.generateSecret(dhSKAlgo); return sKey.getEncoded(); } else { - String algoStr = getDhAlgoStr(dhSKAlgo); - return ka.generateSecret(); } } @@ -478,16 +495,13 @@ public class GMSEncrypt implements Cloneable{ } /*** * this will hold the common key for cluster - * that will be created using publickey of all the members.. - * */ protected class ClusterEncryptor{ byte[] secretBytes; + //TODO: need to look this is thread safe Cipher encrypt; Cipher decrypt; - int viewId; - Set<InternalDistributedMember> mbrs; - + public ClusterEncryptor(GMSEncrypt other) throws Exception { GMSEncrypt mine = new GMSEncrypt(other.services); this.secretBytes = GMSEncrypt.generateSecret(mine.dhSKAlgo, mine.dhPrivateKey, other.dhPublicKey); @@ -496,8 +510,8 @@ public class GMSEncrypt implements Cloneable{ public ClusterEncryptor(byte[] sb) { this.secretBytes = sb; } - - public byte [] encryptBytes(byte[] data) throws Exception { + + public synchronized byte[] encryptBytes(byte[] data) throws Exception { String algo = dhSKAlgo; return GMSEncrypt.encryptBytes(data, getEncryptCipher(algo)); } @@ -516,9 +530,8 @@ public class GMSEncrypt implements Cloneable{ } return encrypt; } - - public byte[] decryptBytes(byte[] data) throws Exception - { + + public synchronized byte[] decryptBytes(byte[] data) throws Exception { String algo = dhSKAlgo; Cipher c = getDecryptCipher(algo); return GMSEncrypt.decryptBytes(data, c); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java index 331d672..42064cf 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java @@ -16,7 +16,60 @@ */ package com.gemstone.gemfire.distributed.internal.membership.gms.messenger; -import com.gemstone.gemfire.*; +import static com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil.replaceStrings; +import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST; +import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE; +import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_REQ; +import static com.gemstone.gemfire.internal.DataSerializableFixedID.FIND_COORDINATOR_RESP; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.logging.log4j.Logger; +import org.jgroups.Address; +import org.jgroups.Event; +import org.jgroups.JChannel; +import org.jgroups.Message; +import org.jgroups.Message.Flag; +import org.jgroups.Message.TransientFlag; +import org.jgroups.ReceiverAdapter; +import org.jgroups.View; +import org.jgroups.ViewId; +import org.jgroups.conf.ClassConfigurator; +import org.jgroups.protocols.UDP; +import org.jgroups.protocols.pbcast.NAKACK2; +import org.jgroups.stack.IpAddress; +import org.jgroups.util.Digest; +import org.jgroups.util.UUID; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.ForcedDisconnectException; +import com.gemstone.gemfire.GemFireConfigException; +import com.gemstone.gemfire.GemFireIOException; +import com.gemstone.gemfire.SystemConnectException; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.DurableClientAttributes; import com.gemstone.gemfire.distributed.internal.*; @@ -28,10 +81,17 @@ import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember; import com.gemstone.gemfire.distributed.internal.membership.gms.Services; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler; import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Messenger; +import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest; +import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage; import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage; -import com.gemstone.gemfire.internal.*; +import com.gemstone.gemfire.internal.ClassPathLoader; +import com.gemstone.gemfire.internal.HeapDataOutputStream; +import com.gemstone.gemfire.internal.InternalDataSerializer; +import com.gemstone.gemfire.internal.OSProcess; +import com.gemstone.gemfire.internal.SocketCreator; import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.VersionedDataInputStream; import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig; import com.gemstone.gemfire.internal.cache.DirectReplyMessage; import com.gemstone.gemfire.internal.cache.DistributedCacheOperation; @@ -225,11 +285,11 @@ public class JGroupsMessenger implements Messenger { if ( !dc.getSecurityClientDHAlgo().isEmpty() ) { try { this.encrypt = new GMSEncrypt(services); + logger.info("Initializing GMSEncrypt "); } catch (Exception e) { throw new GemFireConfigException("problem initializing encryption protocol", e); } } - } @Override @@ -355,12 +415,7 @@ public class JGroupsMessenger implements Messenger { addressesWithIoExceptionsProcessed.clear(); if (encrypt != null) { - try { - encrypt.installView(v); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } + encrypt.installView(v); } } @@ -573,20 +628,7 @@ public class JGroupsMessenger implements Messenger { public Set<InternalDistributedMember> sendUnreliably(DistributionMessage msg) { return send(msg, false); } - - @Override - public Set<InternalDistributedMember> send(DistributionMessage msg, NetView alternateView) { - if (this.encrypt != null) { - try { - this.encrypt.installView(alternateView); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - } - return send(msg, true); - } - + @Override public Set<InternalDistributedMember> send(DistributionMessage msg) { return send(msg, true); @@ -636,7 +678,7 @@ public class JGroupsMessenger implements Messenger { if (useMcast) { long startSer = theStats.startMsgSerialization(); - Message jmsg = createJGMessage(msg, local, null, Version.CURRENT_ORDINAL); + Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL); theStats.endMsgSerialization(startSer); Exception problem; @@ -678,7 +720,7 @@ public class JGroupsMessenger implements Messenger { } // useMcast else { // ! useMcast int len = destinations.length; - List<InternalDistributedMember> calculatedMembers; // explicit list of members + List<GMSMember> calculatedMembers; // explicit list of members int calculatedLen; // == calculatedMembers.len if (len == 1 && destinations[0] == DistributionMessage.ALL_RECIPIENTS) { // send to all // Grab a copy of the current membership @@ -686,40 +728,51 @@ public class JGroupsMessenger implements Messenger { // Construct the list calculatedLen = v.size(); - calculatedMembers = new LinkedList<InternalDistributedMember>(); + calculatedMembers = new LinkedList<GMSMember>(); for (int i = 0; i < calculatedLen; i ++) { InternalDistributedMember m = (InternalDistributedMember)v.get(i); - calculatedMembers.add(m); + calculatedMembers.add((GMSMember)m.getNetMember()); } } // send to all else { // send to explicit list calculatedLen = len; - calculatedMembers = new LinkedList<>(); + calculatedMembers = new LinkedList<GMSMember>(); for (int i = 0; i < calculatedLen; i ++) { - calculatedMembers.add(destinations[i]); + calculatedMembers.add((GMSMember)destinations[i].getNetMember()); } } // send to explicit list Int2ObjectOpenHashMap<Message> messages = new Int2ObjectOpenHashMap<>(); long startSer = theStats.startMsgSerialization(); - - boolean encode = (encrypt != null); - boolean firstMessage = true; + for (Iterator<GMSMember> it=calculatedMembers.iterator(); it.hasNext(); ) { + GMSMember mbr = it.next(); + short version = mbr.getVersionOrdinal(); + if ( !messages.containsKey(version) ) { + Message jmsg = createJGMessage(msg, local, version); + messages.put(version, jmsg); + if (firstMessage) { + theStats.incSentBytes(jmsg.getLength()); + firstMessage = false; + } + } + } + theStats.endMsgSerialization(startSer); Collections.shuffle(calculatedMembers); int i=0; - for (InternalDistributedMember mbr: calculatedMembers) { - short version = mbr.getNetMember().getVersionOrdinal(); + for (GMSMember mbr: calculatedMembers) { JGAddress to = new JGAddress(mbr); - Message jmsg = createJGMessage(msg, local, mbr, version); + short version = mbr.getVersionOrdinal(); + Message jmsg = (Message)messages.get(version); Exception problem = null; try { + Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg; if (!reliably) { jmsg.setFlag(Message.Flag.NO_RELIABILITY); } - jmsg.setDest(to); - jmsg.setSrc(this.jgAddress); + tmp.setDest(to); + tmp.setSrc(this.jgAddress); logger.trace("Unicasting to {}", to); - myChannel.send(jmsg); + myChannel.send(tmp); } catch (Exception e) { problem = e; @@ -775,7 +828,7 @@ public class JGroupsMessenger implements Messenger { * @param version the version of the recipient * @return the new message */ - Message createJGMessage(DistributionMessage gfmsg, JGAddress src, InternalDistributedMember recipient, short version) { + Message createJGMessage(DistributionMessage gfmsg, JGAddress src, short version) { if(gfmsg instanceof DirectReplyMessage) { ((DirectReplyMessage) gfmsg).registerProcessor(); } @@ -785,35 +838,17 @@ public class JGroupsMessenger implements Messenger { setMessageFlags(gfmsg, msg); try { long start = services.getStatistics().startMsgSerialization(); - HeapDataOutputStream out_stream = - new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version)); + byte[] messageBytes = null; + HeapDataOutputStream out_stream = new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version)); Version.CURRENT.writeOrdinal(out_stream, true); - DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream); - boolean encode = encrypt != null && recipient != null; - if (encode) { - // Coordinator doesn't know our publicKey for a JoinRequest - if (gfmsg.getDSFID() == JOIN_REQUEST || gfmsg.getDSFID() == JOIN_RESPONSE) { - encode = false; - } - } - if (encode) { - logger.info("encoding {}", gfmsg); - try { - out_stream.writeBoolean(true); // TODO we should have flag bits - HeapDataOutputStream out_stream2 = - new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version)); - DataSerializer.writeObject(gfmsg, out_stream2); - byte[] payload = out_stream2.toByteArray(); - payload = encrypt.encryptData(payload, recipient); - DataSerializer.writeByteArray(payload, out_stream); - } catch (Exception e) { - throw new GemFireIOException("unable to send message", e); - } + if(encrypt != null) { + out_stream.writeBoolean(true); + writeEncryptedMessage(gfmsg, version, out_stream); } else { - logger.info("not encoding {}", gfmsg); out_stream.writeBoolean(false); - DataSerializer.writeObject(gfmsg, out_stream); + serializeMessage(gfmsg, out_stream); } + msg.setBuffer(out_stream.toByteArray()); services.getStatistics().endMsgSerialization(start); } @@ -827,9 +862,82 @@ public class JGroupsMessenger implements Messenger { ioe.initCause(ex); throw ioe; } + } catch(Exception ex){ + logger.warn("Error serializing message", ex); + GemFireIOException ioe = new + GemFireIOException("Error serializing message"); + ioe.initCause(ex.getCause()); + throw ioe; } return msg; } + + void writeEncryptedMessage(DistributionMessage gfmsg, short version, HeapDataOutputStream out) throws Exception { + InternalDataSerializer.writeDSFIDHeader(gfmsg.getDSFID(), out); + byte[] pk = null; + int requestId = 0; + InternalDistributedMember pkMbr = null; + switch (gfmsg.getDSFID()) { + case FIND_COORDINATOR_REQ: + case JOIN_REQUEST: + //need to append mine PK + pk = encrypt.getPublicKey(localAddress); + + pkMbr = gfmsg.getRecipients()[0]; + requestId = getRequestId(gfmsg, true); + break; + case FIND_COORDINATOR_RESP: + case JOIN_RESPONSE: + pkMbr = gfmsg.getRecipients()[0]; + requestId = getRequestId(gfmsg, false); + default: + break; + } + logger.debug("writeEncryptedMessage gfmsg.getDSFID() = {} for {} with requestid {}", gfmsg.getDSFID(), pkMbr, requestId); + out.writeInt(requestId); + if (pk != null) { + InternalDataSerializer.writeByteArray(pk, out); + } + + HeapDataOutputStream out_stream = new HeapDataOutputStream(Version.fromOrdinalOrCurrent(version)); + byte[] messageBytes = serializeMessage(gfmsg, out_stream); + + if (pkMbr != null) { + // using members private key + messageBytes = encrypt.encryptData(messageBytes, pkMbr); + } else { + // using cluster secret key + messageBytes = encrypt.encryptData(messageBytes); + } + InternalDataSerializer.writeByteArray(messageBytes, out); + } + + int getRequestId(DistributionMessage gfmsg, boolean add) { + int requestId = 0; + if (gfmsg instanceof FindCoordinatorRequest) { + requestId = ((FindCoordinatorRequest) gfmsg).getRequestId(); + } else if (gfmsg instanceof JoinRequestMessage) { + requestId = ((JoinRequestMessage) gfmsg).getRequestId(); + } else if (gfmsg instanceof FindCoordinatorResponse) { + requestId = ((FindCoordinatorResponse) gfmsg).getRequestId(); + } else if (gfmsg instanceof JoinResponseMessage) { + requestId = ((JoinResponseMessage) gfmsg).getRequestId(); + } + + if (add) { + addRequestId(requestId, gfmsg.getRecipients()[0]); + } + + return requestId; + } + + byte[] serializeMessage(DistributionMessage gfmsg, HeapDataOutputStream out_stream) throws IOException { + + DataSerializer.writeObject(this.localAddress.getNetMember(), out_stream); + DataSerializer.writeObject(gfmsg, out_stream); + + return out_stream.toByteArray(); + } void setMessageFlags(DistributionMessage gfmsg, Message msg) { // GemFire uses its own reply processors so there is no need @@ -870,17 +978,14 @@ public class JGroupsMessenger implements Messenger { // as STABLE_GOSSIP logger.trace("message length is zero - ignoring"); return null; - } - - InternalDistributedMember sender; + } Exception problem = null; byte[] buf = jgmsg.getRawBuffer(); try { long start = services.getStatistics().startMsgDeserialization(); - - DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf, + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf, jgmsg.getOffset(), jgmsg.getLength())); short ordinal = Version.readOrdinal(dis); @@ -889,44 +994,33 @@ public class JGroupsMessenger implements Messenger { dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow( ordinal, true)); } - - GMSMember m = DataSerializer.readObject(dis); - - sender = getMemberFromView(m, ordinal); - - boolean encrypted = dis.readBoolean(); - - if (encrypted && encrypt != null) { - byte[] payload = DataSerializer.readByteArray(dis); - try { - payload = encrypt.decryptData(payload, sender); - dis = new DataInputStream(new ByteArrayInputStream(payload)); - if (ordinal < Version.CURRENT_ORDINAL) { - dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow( - ordinal, true)); - } - } catch (Exception e) { - throw new GemFireIOException("unable to receive message", e); - } - } - - result = DataSerializer.readObject(dis); - - DistributionMessage dm = (DistributionMessage)result; + + //read + boolean isEncrypted = dis.readBoolean(); - // JoinRequestMessages are sent with an ID that may have been - // reused from a previous life by way of auto-reconnect, - // so we don't want to find a canonical reference for the - // request's sender ID - if (dm.getDSFID() == JOIN_REQUEST) { - sender = ((JoinRequestMessage)dm).getMemberID(); + if(isEncrypted && encrypt == null) { + throw new GemFireConfigException("Got remote message as encrypted"); + } + + if(isEncrypted) { + result = readEncryptedMessage(dis, ordinal, encrypt); + } else { + GMSMember m = DataSerializer.readObject(dis); + + result = DataSerializer.readObject(dis); + + DistributionMessage dm = (DistributionMessage)result; + + setSender(dm, m, ordinal); } - ((DistributionMessage)result).setSender(sender); + services.getStatistics().endMsgDeserialization(start); } catch (ClassNotFoundException | IOException | RuntimeException e) { problem = e; + } catch(Exception e) { + problem = e; } if (problem != null) { logger.error(LocalizedMessage.create( @@ -937,37 +1031,113 @@ public class JGroupsMessenger implements Messenger { return result; } - - /** look for certain messages that may need to be altered before being sent */ - void filterOutgoingMessage(DistributionMessage m) { - switch (m.getDSFID()) { + void setSender(DistributionMessage dm, GMSMember m, short ordinal) { + InternalDistributedMember sender = null; + // JoinRequestMessages are sent with an ID that may have been + // reused from a previous life by way of auto-reconnect, + // so we don't want to find a canonical reference for the + // request's sender ID + if (dm.getDSFID() == JOIN_REQUEST) { + sender = ((JoinRequestMessage)dm).getMemberID(); + } else { + sender = getMemberFromView(m, ordinal); + } + dm.setSender(sender); + } + + @SuppressWarnings("resource") + DistributionMessage readEncryptedMessage(DataInputStream dis, short ordinal, GMSEncrypt encryptLocal) throws Exception { + int dfsid = InternalDataSerializer.readDSFIDHeader(dis); + int requestId = dis.readInt(); + + try { + // TODO seems like we don't need this, just set bit that PK is appended + + logger.debug("readEncryptedMessage Reading Request id " + dfsid + " and requestid is " + requestId + " myid " + this.localAddress); + InternalDistributedMember pkMbr = null; + boolean readPK = false; + switch (dfsid) { + case FIND_COORDINATOR_REQ: case JOIN_REQUEST: - if (encrypt == null) { - break; - } - JoinRequestMessage joinMsg = (JoinRequestMessage)m; - joinMsg.setPublicKey(encrypt.getPublicKeyBytes()); + readPK = true; break; - + case FIND_COORDINATOR_RESP: case JOIN_RESPONSE: - JoinResponseMessage jrsp = (JoinResponseMessage)m; + // this will have requestId to know the PK + pkMbr = getRequestedMember(requestId); + break; + } + + byte[] data; + + byte[] pk = null; + + if (readPK) { + // need to read PK + pk = InternalDataSerializer.readByteArray(dis); + // encrypt.setPublicKey(publickey, mbr); + data = InternalDataSerializer.readByteArray(dis); + // using prefixed pk from sender + data = encryptLocal.decryptData(data, pk); + } else { + data = InternalDataSerializer.readByteArray(dis); + // from cluster key + if (pkMbr != null) { + // using member public key + data = encryptLocal.decryptData(data, pkMbr); + } else { + // from cluster key + data = encryptLocal.decryptData(data); + } + } + + { + DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); + + if (ordinal < Version.CURRENT_ORDINAL) { + in = new VersionedDataInputStream(in, Version.fromOrdinalNoThrow(ordinal, true)); + } + + GMSMember m = DataSerializer.readObject(in); + + DistributionMessage result = (DistributionMessage) DataSerializer.readObject(in); + + setSender(result, m, ordinal); + + if (pk != null) { + encryptLocal.setPublicKey(pk, result.getSender()); + } + + return result; + } + } catch (Exception e) { + throw new Exception("Message id is " + dfsid, e); + } - if (jrsp.getRejectionMessage() == null + } + + /** look for certain messages that may need to be altered before being sent */ + void filterOutgoingMessage(DistributionMessage m) { + switch (m.getDSFID()) { + case JOIN_RESPONSE: + JoinResponseMessage jrsp = (JoinResponseMessage)m; + + if (jrsp.getRejectionMessage() == null && services.getConfig().getTransport().isMcastEnabled()) { - // get the multicast message digest and pass it with the join response - Digest digest = (Digest)this.myChannel.getProtocolStack() + // get the multicast message digest and pass it with the join response + Digest digest = (Digest)this.myChannel.getProtocolStack() .getTopProtocol().down(Event.GET_DIGEST_EVT); - HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT); - try { - digest.writeTo(hdos); - } catch (Exception e) { - logger.fatal("Unable to serialize JGroups messaging digest", e); - } - jrsp.setMessengerData(hdos.toByteArray()); + HeapDataOutputStream hdos = new HeapDataOutputStream(500, Version.CURRENT); + try { + digest.writeTo(hdos); + } catch (Exception e) { + logger.fatal("Unable to serialize JGroups messaging digest", e); } - break; - default: - break; + jrsp.setMessengerData(hdos.toByteArray()); + } + break; + default: + break; } } @@ -1152,7 +1322,7 @@ public class JGroupsMessenger implements Messenger { if (clazz.isAssignableFrom(msgClazz)) { h = handlers.get(clazz); handlers.put(msg.getClass(), h); - break; + break; } } } @@ -1163,6 +1333,72 @@ public class JGroupsMessenger implements Messenger { } } + @Override + public Set<InternalDistributedMember> send(DistributionMessage msg, NetView alternateView) { + if (this.encrypt != null) { + this.encrypt.installView(alternateView); + } + return send(msg, true); + } + + @Override + public byte[] getPublickey(InternalDistributedMember mbr) { + if (encrypt != null) { + return encrypt.getPublicKey(mbr); + } + return null; + } + + @Override + public void setPublicKey(byte[] publickey, InternalDistributedMember mbr) { + if (encrypt != null) { + logger.debug("Setting pK for member " + mbr); + encrypt.setPublicKey(publickey, mbr); + } + } + + @Override + public void setClusterSecretKey(byte[] clusterSecretKey) { + if (encrypt != null) { + logger.debug("Setting cluster key"); + encrypt.addClusterKey(clusterSecretKey); + } + } + + @Override + public byte[] getClusterSecretKey() { + if (encrypt != null) { + return encrypt.getClusterSecretKey(); + } + return null; + } + + private Random randomId = new Random(); + private HashMap<Integer, InternalDistributedMember> requestIdVsRecipients = new HashMap<>(); + + InternalDistributedMember getRequestedMember(int requestId) { + //TODO: what if we don't get response, need to remove this otherwise it will be leak + return requestIdVsRecipients.remove(requestId); + } + void addRequestId(int requestId, InternalDistributedMember mbr) { + requestIdVsRecipients.put(requestId, mbr); + } + @Override + public int getRequestId() { + return randomId.nextInt(); + } + + @Override + public void initClusterKey() { + if (encrypt != null) { + try { + logger.debug("Initializing cluster key"); + encrypt.initClusterSecretKey(); + } catch (Exception e) { + throw new RuntimeException("unable to create cluster key ", e); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java index 04770bb..730fe6f 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/InternalDataSerializer.java @@ -2644,6 +2644,24 @@ public abstract class InternalDataSerializer extends DataSerializer implements D } } + public static final int readDSFIDHeader(final DataInput in) + throws IOException, ClassNotFoundException + { + checkIn(in); + byte header = in.readByte(); + if (header == DS_FIXED_ID_BYTE) { + return in.readByte(); + } else if (header == DS_FIXED_ID_SHORT) { + return in.readShort(); + } else if (header == DS_NO_FIXED_ID) { + return Integer.MAX_VALUE;//is that correct?? + } else if (header == DS_FIXED_ID_INT) { + return in.readInt(); + } else { + throw new IllegalStateException("unexpected byte: " + header + " while reading dsfid"); + } + } + /** * Reads an instance of <code>String</code> from a * <code>DataInput</code> given the header byte already being read. http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5247cce/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java index 3c05794..e180326 100755 --- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DistributedMulticastRegionDUnitTest.java @@ -33,6 +33,7 @@ import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionAttributes; import com.gemstone.gemfire.cache.Scope; import com.gemstone.gemfire.distributed.Locator; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.distributed.internal.InternalLocator; import com.gemstone.gemfire.internal.AvailablePortHelper; import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory; @@ -253,6 +254,7 @@ public class DistributedMulticastRegionDUnitTest extends JUnit4CacheTestCase { p.put(MCAST_TTL, mcastttl); p.put(LOCATORS, "localhost[" + locatorPort + "]"); p.put(LOG_LEVEL, "info"); + p.put(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128"); return p; } @@ -272,30 +274,30 @@ public class DistributedMulticastRegionDUnitTest extends JUnit4CacheTestCase { } private int startLocator() { - final int [] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3); - final int locatorPort = ports[0]; - - VM locator1Vm = Host.getHost(0).getVM(locatorVM); - locator1Vm.invoke(new SerializableCallable() { - @Override - public Object call() { - final File locatorLogFile = new File(getTestMethodName() + "-locator-" + locatorPort + ".log"); - final Properties locatorProps = new Properties(); - locatorProps.setProperty(NAME, "LocatorWithMcast"); - locatorProps.setProperty(MCAST_PORT, mcastport); - locatorProps.setProperty(MCAST_TTL, mcastttl); - locatorProps.setProperty(LOG_LEVEL, "info"); - //locatorProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true"); - try { - final InternalLocator locator = (InternalLocator) Locator.startLocatorAndDS(locatorPort, null, null, - locatorProps); - System.out.println("test Locator started " + locatorPort); - } catch (IOException ioex) { - fail("Unable to create a locator with a shared configuration"); - } - - return null; + final int [] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3); + final int locatorPort = ports[0]; + + VM locator1Vm = Host.getHost(0).getVM(locatorVM);; + locator1Vm.invoke(new SerializableCallable() { + @Override + public Object call() { + final File locatorLogFile = new File(getTestMethodName() + "-locator-" + locatorPort + ".log"); + final Properties locatorProps = new Properties(); + locatorProps.setProperty(DistributionConfig.NAME_NAME, "LocatorWithMcast"); + locatorProps.setProperty(DistributionConfig.MCAST_PORT_NAME, mcastport); + locatorProps.setProperty(DistributionConfig.MCAST_TTL_NAME, mcastttl); + locatorProps.setProperty(DistributionConfig.LOG_LEVEL_NAME, "info"); + locatorProps.setProperty(DistributionConfig.SECURITY_CLIENT_DHALGO_NAME, "AES:128"); + //locatorProps.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true"); + try { + final InternalLocator locator = (InternalLocator) Locator.startLocatorAndDS(locatorPort, null, null, + locatorProps); + System.out.println("test Locator started " + locatorPort); + } catch (IOException ioex) { + fail("Unable to create a locator with a shared configuration"); } + return null; + } }); return locatorPort; }
