This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 51d048a93a Add cluster metadata id to gossip syn messages
51d048a93a is described below
commit 51d048a93a7e7cfb93a544dabba4b6f7aa1bbdd1
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Mon Apr 29 11:36:51 2024 +0100
Add cluster metadata id to gossip syn messages
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
CASSANDRA-19613
---
CHANGES.txt | 1 +
.../org/apache/cassandra/gms/GossipDigestSyn.java | 15 +-
.../cassandra/gms/GossipDigestSynVerbHandler.java | 7 +
src/java/org/apache/cassandra/gms/Gossiper.java | 3 +
src/java/org/apache/cassandra/gms/NewGossiper.java | 6 +-
test/data/serialization/5.1/gms.EndpointState.bin | Bin 73 -> 73 bytes
test/data/serialization/5.1/gms.Gossip.bin | Bin 166 -> 4914 bytes
.../distributed/test/tcm/SplitBrainTest.java | 161 ++++++++++++++++++---
.../apache/cassandra/gms/SerializationsTest.java | 7 +-
.../org/apache/cassandra/net/HandshakeTest.java | 3 +-
.../cassandra/net/OutboundConnectionsTest.java | 3 +-
11 files changed, 179 insertions(+), 27 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 168f40d224..c8355deda2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Add cluster metadata id to gossip syn messages (CASSANDRA-19613)
* Reduce heap usage occupied by the metrics (CASSANDRA-19567)
* Improve handling of transient replicas during range movements
(CASSANDRA-19344)
* Enable debounced internode log requests to be cancelled at shutdown
(CASSANDRA-19514)
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
index 7c2ae945c8..ec4639f087 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSyn.java
@@ -25,6 +25,8 @@ import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tcm.ClusterMetadata;
/**
* This is the first message that gets sent out as a start of the Gossip
protocol in a
@@ -37,12 +39,14 @@ public class GossipDigestSyn
final String clusterId;
final String partioner;
+ final int metadataId;
final List<GossipDigest> gDigests;
- public GossipDigestSyn(String clusterId, String partioner,
List<GossipDigest> gDigests)
+ public GossipDigestSyn(String clusterId, String partioner, int metadataId,
List<GossipDigest> gDigests)
{
this.clusterId = clusterId;
this.partioner = partioner;
+ this.metadataId = metadataId;
this.gDigests = gDigests;
}
@@ -85,6 +89,8 @@ class GossipDigestSynSerializer implements
IVersionedSerializer<GossipDigestSyn>
{
out.writeUTF(gDigestSynMessage.clusterId);
out.writeUTF(gDigestSynMessage.partioner);
+ if (version >= MessagingService.VERSION_51)
+ out.writeUnsignedVInt32(gDigestSynMessage.metadataId);
GossipDigestSerializationHelper.serialize(gDigestSynMessage.gDigests,
out, version);
}
@@ -93,14 +99,19 @@ class GossipDigestSynSerializer implements
IVersionedSerializer<GossipDigestSyn>
String clusterId = in.readUTF();
String partioner = null;
partioner = in.readUTF();
+ int metadataId = version >= MessagingService.VERSION_51
+ ? in.readUnsignedVInt32()
+ : ClusterMetadata.EMPTY_METADATA_IDENTIFIER;
List<GossipDigest> gDigests =
GossipDigestSerializationHelper.deserialize(in, version);
- return new GossipDigestSyn(clusterId, partioner, gDigests);
+ return new GossipDigestSyn(clusterId, partioner, metadataId, gDigests);
}
public long serializedSize(GossipDigestSyn syn, int version)
{
long size = TypeSizes.sizeof(syn.clusterId);
size += TypeSizes.sizeof(syn.partioner);
+ if (version >= MessagingService.VERSION_51)
+ size += TypeSizes.sizeofUnsignedVInt(syn.metadataId);
size += GossipDigestSerializationHelper.serializedSize(syn.gDigests,
version);
return size;
}
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index c2713863cc..692e962528 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -30,6 +30,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tcm.ClusterMetadata;
import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_ACK;
@@ -65,6 +66,12 @@ public class GossipDigestSynVerbHandler extends
GossipVerbHandler<GossipDigestSy
return;
}
+ if (gDigestMessage.metadataId !=
ClusterMetadata.current().metadataIdentifier)
+ {
+ logger.warn("Cluster metadata identifier mismatch from {} {}!={}",
from, gDigestMessage.metadataId, ClusterMetadata.current().metadataIdentifier);
+ return;
+ }
+
List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
// if the syn comes from a peer performing a shadow round and this
node is
// also currently in a shadow round, send back a minimal ack. This
node must
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 35cf57d3e1..7d3e0c9cc5 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -260,6 +260,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
{
GossipDigestSyn digestSynMessage = new
GossipDigestSyn(getClusterName(),
getPartitionerName(),
+
ClusterMetadata.current().metadataIdentifier,
gDigests);
Message<GossipDigestSyn> message =
Message.out(GOSSIP_DIGEST_SYN, digestSynMessage);
/* Gossip to some random live member */
@@ -1353,6 +1354,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
InetAddressAndPort ep = entry.getKey();
if (ep.equals(getBroadcastAddressAndPort()))
continue;
+
if (justRemovedEndpoints.containsKey(ep))
{
if (logger.isTraceEnabled())
@@ -2223,6 +2225,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean,
Gossiper.instance.makeGossipDigest(gDigests);
GossipDigestSyn digestSynMessage = new
GossipDigestSyn(getClusterName(),
getPartitionerName(),
+
ClusterMetadata.current().metadataIdentifier,
gDigests);
Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN,
digestSynMessage);
sendGossip(message, cms);
diff --git a/src/java/org/apache/cassandra/gms/NewGossiper.java
b/src/java/org/apache/cassandra/gms/NewGossiper.java
index 8473a40d34..4bb8b75968 100644
--- a/src/java/org/apache/cassandra/gms/NewGossiper.java
+++ b/src/java/org/apache/cassandra/gms/NewGossiper.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.compatibility.GossipHelper;
import org.apache.cassandra.utils.concurrent.Accumulator;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
@@ -127,7 +128,10 @@ public class NewGossiper
public Promise<Map<InetAddressAndPort, EndpointState>> doShadowRound()
{
// send a completely empty syn
- GossipDigestSyn digestSynMessage = new
GossipDigestSyn(getClusterName(), getPartitionerName(), new ArrayList<>());
+ GossipDigestSyn digestSynMessage = new
GossipDigestSyn(getClusterName(),
+
getPartitionerName(),
+
ClusterMetadata.current().metadataIdentifier,
+ new
ArrayList<>());
Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN,
digestSynMessage);
logger.info("Sending shadow round GOSSIP DIGEST SYN to known peers
{}", peers);
diff --git a/test/data/serialization/5.1/gms.EndpointState.bin
b/test/data/serialization/5.1/gms.EndpointState.bin
index d5c4eacd84..9baeac9816 100644
Binary files a/test/data/serialization/5.1/gms.EndpointState.bin and
b/test/data/serialization/5.1/gms.EndpointState.bin differ
diff --git a/test/data/serialization/5.1/gms.Gossip.bin
b/test/data/serialization/5.1/gms.Gossip.bin
index 7a4fb5666e..083da42747 100644
Binary files a/test/data/serialization/5.1/gms.Gossip.bin and
b/test/data/serialization/5.1/gms.Gossip.bin differ
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java
index 9533eb393d..1382f8b063 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tcm/SplitBrainTest.java
@@ -19,49 +19,171 @@
package org.apache.cassandra.distributed.test.tcm;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.SimpleSeedProvider;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.log.LogState;
+import org.awaitility.Awaitility;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class SplitBrainTest extends TestBaseImpl
{
@Test
public void testSplitBrainStartup() throws IOException, TimeoutException
{
- // partition the cluster in 2 parts on startup, node1, node2 in one,
node3, node4 in the other
- try (Cluster cluster = builder().withNodes(4)
- .withConfig(config ->
config.with(GOSSIP).with(NETWORK)
-
.set("seed_provider", new
IInstanceConfig.ParameterizedClass(SimpleSeedProvider.class.getName(),
-
Collections.singletonMap("seeds", "127.0.0.1,127.0.0.3")))
-
.set("discovery_timeout", "1s"))
- .createWithoutStarting())
+ try (Setup setup = setupSplitBrainCluster())
{
- cluster.filters().allVerbs().from(1,2).to(3,4).drop();
- cluster.filters().allVerbs().from(3,4).to(1,2).drop();
- List<Thread> startupThreads = new ArrayList<>(4);
- for (int i = 0; i < 4; i++)
- {
- int threadNr = i + 1;
- startupThreads.add(new Thread(() ->
cluster.get(threadNr).startup()));
- }
- startupThreads.forEach(Thread::start);
- startupThreads.forEach(SplitBrainTest::join);
- cluster.filters().reset();
+ Cluster cluster = setup.cluster;
+ // Perform a schema change on one of the clusters resulting from
the split brain during initialisation
+ // before dropping message filters. When comms can be
reestablished, we fake the replication of metdata
+ // state from on cluster to the other.
cluster.coordinator(1).execute(withKeyspace("create keyspace %s
with replication = {'class':'SimpleStrategy', 'replication_factor':1}"),
ConsistencyLevel.ALL);
+ long clusterOneEpoch =
ClusterUtils.getCurrentEpoch(cluster.get(1)).getEpoch();
+ long clusterTwoEpoch =
ClusterUtils.getCurrentEpoch(cluster.get(3)).getEpoch();
+ assertTrue(clusterOneEpoch > clusterTwoEpoch);
+
+ // Artificially induce node1 to replicate to node3. This should be
rejected by node3 as the two technically
+ // belong to different clusters.
+ long mark = cluster.get(3).logs().mark();
+ // Turn off the initial filters
+ setup.reenableCommunication();
+
+ cluster.get(1).runOnInstance(() -> {
+ LogState state =
LogState.getForRecovery(ClusterMetadata.current().epoch);
+
MessagingService.instance().send(Message.out(Verb.TCM_REPLICATION, state),
+
InetAddressAndPort.getByNameUnchecked("127.0.0.3"));
+ });
+ cluster.get(3).logs().watchFor(mark, Duration.ofSeconds(10),
"Cluster Metadata Identifier mismatch");
+ assertEquals(clusterOneEpoch,
ClusterUtils.getCurrentEpoch(cluster.get(1)).getEpoch());
+ assertEquals(clusterTwoEpoch,
ClusterUtils.getCurrentEpoch(cluster.get(3)).getEpoch());
+ }
+ }
+
+
+ @Test
+ public void testFilterGossipStatesWithMismatchingMetadataId() throws
IOException, TimeoutException
+ {
+ try (Setup setup = setupSplitBrainCluster())
+ {
+ Cluster cluster = setup.cluster;
+ // Allow nodes from the two clusters to communicate again. Because
each node's seed list contains an
+ // instance from the other cluster, they will attempt to perform
gossip exchange with that instance.
+ // Verify that when this happens, gossip state isn't updated with
instances from the other cluster.
+ AtomicInteger node1Received = new AtomicInteger(0);
+ AtomicInteger node3Received = new AtomicInteger(0);
+
+
cluster.filters().inbound().from(1,2,3,4).to(1,2,3,4).messagesMatching((from,
to, msg) -> {
+ if (msg.verb() == Verb.GOSSIP_DIGEST_SYN.id ||
+ msg.verb() == Verb.GOSSIP_DIGEST_ACK.id ||
+ msg.verb() == Verb.GOSSIP_DIGEST_ACK2.id)
+ {
+ if (to == 1 && (from == 3 || from == 4))
+ node1Received.incrementAndGet();
+ if (to == 3 && (from == 1 || from == 2))
+ node3Received.incrementAndGet();
+ }
+ return false;
+ }).drop().on();
+
+ // Turn off the initial filters
+ setup.reenableCommunication();
+
+ // Wait for cross-cluster gossip communication
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(30))
+ .until(() -> node1Received.get() > 5 &&
node3Received.get() > 5);
+
+ // Verify that gossip states for nodes which are not a member of
the same cluster were disregarded.
+ // Each node should have gossip state only for itself and the one
other member of its cluster.
+ cluster.forEach(inst -> {
+ int id = inst.config().num();
+ boolean gossipStateValid = inst.callOnInstance((() -> {
+ Map<InetAddressAndPort, EndpointState> eps =
Gossiper.instance.endpointStateMap;
+ if (eps.size() != 2)
+ return false;
+ Collection<InetAddressAndPort> expectedEps = (id <= 2)
+ ?
Arrays.asList(InetAddressAndPort.getByNameUnchecked("127.0.0.1"),
+
InetAddressAndPort.getByNameUnchecked("127.0.0.2"))
+ :
Arrays.asList(InetAddressAndPort.getByNameUnchecked("127.0.0.3"),
+
InetAddressAndPort.getByNameUnchecked("127.0.0.4"));
+ return eps.keySet().containsAll(expectedEps);
+ }));
+ Assert.assertTrue(String.format("Unexpected gossip state on
node %s", id), gossipStateValid);
+ });
+ }
+ }
- cluster.get(3).logs().watchFor("Cluster Metadata Identifier
mismatch");
+ private Setup setupSplitBrainCluster() throws IOException
+ {
+ // partition the cluster in 2 parts on startup, node1, node2 in one,
node3, node4 in the other
+ Cluster cluster = builder().withNodes(4)
+ .withConfig(config ->
config.with(GOSSIP).with(NETWORK)
+
.set("seed_provider", new
IInstanceConfig.ParameterizedClass(SimpleSeedProvider.class.getName(),
+
Collections.singletonMap("seeds",
"127.0.0.1,127.0.0.3")))
+
.set("discovery_timeout", "1s"))
+ .createWithoutStarting();
+ IMessageFilters.Filter drop1 = cluster.filters().allVerbs().from(1,
2).to(3, 4).drop();
+ IMessageFilters.Filter drop2 = cluster.filters().allVerbs().from(3,
4).to(1, 2).drop();
+ List<Thread> startupThreads = new ArrayList<>(4);
+ for (int i = 0; i < 4; i++)
+ {
+ int threadNr = i + 1;
+ startupThreads.add(new Thread(() ->
cluster.get(threadNr).startup()));
+ }
+ startupThreads.forEach(Thread::start);
+ startupThreads.forEach(SplitBrainTest::join);
+ return new Setup(cluster, drop1, drop2);
+ }
+
+ private final class Setup implements AutoCloseable
+ {
+ final Cluster cluster;
+ final IMessageFilters.Filter[] filters;
+
+ Setup(Cluster cluster, IMessageFilters.Filter ... filters)
+ {
+ this.cluster = cluster;
+ this.filters = filters;
+ }
+
+ void reenableCommunication()
+ {
+ for (IMessageFilters.Filter filter : filters)
+ filter.off();
+ }
+
+ @Override
+ public void close()
+ {
+ cluster.close();
}
}
@@ -76,4 +198,5 @@ public class SplitBrainTest extends TestBaseImpl
throw new RuntimeException(e);
}
}
+
}
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index 1dc44345de..c7320b194e 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -87,6 +87,7 @@ public class SerializationsTest extends
AbstractSerializationsTester
GossipDigestAck2 ack2 = new GossipDigestAck2(states);
GossipDigestSyn syn = new GossipDigestSyn("Not a real cluster name",
ClusterMetadata.current().tokenMap.partitioner().getClass().getCanonicalName(),
+ 20240430,
Statics.Digests);
DataOutputStreamPlus out = getOutput("gms.Gossip.bin");
@@ -113,8 +114,8 @@ public class SerializationsTest extends
AbstractSerializationsTester
int count = 0;
FileInputStreamPlus in = getInput("gms.Gossip.bin");
- while (count < Statics.Digests.size())
- assert GossipDigestAck2.serializer.deserialize(in, getVersion())
!= null;
+ while (count++ < Statics.Digests.size())
+ assert GossipDigest.serializer.deserialize(in, getVersion()) !=
null;
assert GossipDigestAck.serializer.deserialize(in, getVersion()) !=
null;
assert GossipDigestAck2.serializer.deserialize(in, getVersion()) !=
null;
assert GossipDigestSyn.serializer.deserialize(in, getVersion()) !=
null;
@@ -130,7 +131,7 @@ public class SerializationsTest extends
AbstractSerializationsTester
private static VersionedValue vv0 = vvFact.load(23d);
private static VersionedValue vv1 =
vvFact.bootstrapping(Collections.<Token>singleton(partitioner.getRandomToken()));
private static List<GossipDigest> Digests = new
ArrayList<GossipDigest>();
-
+ static
{
HeartbeatSt.updateHeartBeat();
EndpointSt.addApplicationState(ApplicationState.LOAD, vv0);
diff --git a/test/unit/org/apache/cassandra/net/HandshakeTest.java
b/test/unit/org/apache/cassandra/net/HandshakeTest.java
index 99e33eea90..b66ade056e 100644
--- a/test/unit/org/apache/cassandra/net/HandshakeTest.java
+++ b/test/unit/org/apache/cassandra/net/HandshakeTest.java
@@ -57,6 +57,7 @@ import static
org.apache.cassandra.net.MessagingService.minimum_version;
import static org.apache.cassandra.net.OutboundConnectionInitiator.Result;
import static
org.apache.cassandra.net.OutboundConnectionInitiator.SslFallbackConnectionType;
import static
org.apache.cassandra.net.OutboundConnectionInitiator.initiateMessaging;
+import static
org.apache.cassandra.tcm.ClusterMetadata.EMPTY_METADATA_IDENTIFIER;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -317,7 +318,7 @@ public class HandshakeTest
.withDebugCallbacks(new HandshakeAcknowledgeChecker(t -> handshakeEx =
t))
.withFrom(FROM_ADDR);
OutboundConnections outboundConnections =
OutboundConnections.tryRegister(new ConcurrentHashMap<>(), TO_ADDR, settings);
- GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner",
new ArrayList<>(0));
+ GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner",
EMPTY_METADATA_IDENTIFIER, new ArrayList<>(0));
Message<GossipDigestSyn> message = Message.out(Verb.GOSSIP_DIGEST_SYN,
syn);
OutboundConnection outboundConnection =
outboundConnections.connectionFor(message);
outboundConnection.enqueue(message);
diff --git a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
index 6acdf0ce21..5afbe88315 100644
--- a/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
+++ b/test/unit/org/apache/cassandra/net/OutboundConnectionsTest.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import static org.apache.cassandra.net.MessagingService.current_version;
import static
org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
+import static
org.apache.cassandra.tcm.ClusterMetadata.EMPTY_METADATA_IDENTIFIER;
public class OutboundConnectionsTest
{
@@ -96,7 +97,7 @@ public class OutboundConnectionsTest
@Test
public void getConnection_Gossip()
{
- GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner",
new ArrayList<>(0));
+ GossipDigestSyn syn = new GossipDigestSyn("cluster", "partitioner",
EMPTY_METADATA_IDENTIFIER, new ArrayList<>(0));
Message<GossipDigestSyn> message = Message.out(Verb.GOSSIP_DIGEST_SYN,
syn);
Assert.assertEquals(ConnectionType.URGENT_MESSAGES,
connections.connectionFor(message).type());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]