This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a7820d103956ae72ffbb90f39fe6f2452658a708 Merge: 8699366 c2cfebf Author: Alex Petrov <[email protected]> AuthorDate: Fri Mar 27 19:08:31 2020 +0100 Merge branch 'cassandra-3.0' into cassandra-3.11 build.xml | 6 + src/java/org/apache/cassandra/net/MessageOut.java | 26 +- src/java/org/apache/cassandra/tools/NodeProbe.java | 11 + src/java/org/apache/cassandra/tools/NodeTool.java | 4 +- .../org/apache/cassandra/distributed/Cluster.java | 32 +- .../cassandra/distributed/UpgradeableCluster.java | 32 +- .../apache/cassandra/distributed/api/Feature.java | 24 -- .../cassandra/distributed/api/ICoordinator.java | 36 -- .../cassandra/distributed/api/IInstance.java | 57 ---- .../cassandra/distributed/api/IInstanceConfig.java | 60 ---- .../distributed/api/IIsolatedExecutor.java | 126 ------- .../apache/cassandra/distributed/api/IListen.java | 28 -- .../apache/cassandra/distributed/api/IMessage.java | 37 -- .../cassandra/distributed/api/IMessageFilters.java | 56 --- .../distributed/impl/AbstractCluster.java | 374 +++++---------------- .../cassandra/distributed/impl/Coordinator.java | 56 +-- .../impl/DelegatingInvokableInstance.java | 11 +- .../distributed/impl/DistributedTestSnitch.java | 31 +- .../distributed/impl/IInvokableInstance.java | 67 ---- .../distributed/impl/IUpgradeableInstance.java | 1 + .../cassandra/distributed/impl/Instance.java | 247 +++++++++++--- .../distributed/impl/InstanceClassLoader.java | 130 ------- .../cassandra/distributed/impl/InstanceConfig.java | 98 +++--- .../cassandra/distributed/impl/InstanceKiller.java | 50 +++ .../distributed/impl/IsolatedExecutor.java | 4 + .../apache/cassandra/distributed/impl/Listen.java | 1 - .../cassandra/distributed/impl/MessageFilters.java | 165 --------- .../impl/{Message.java => MessageImpl.java} | 27 +- .../distributed/impl/NetworkTopology.java | 137 -------- .../apache/cassandra/distributed/impl/RowUtil.java | 1 + .../cassandra/distributed/impl/TracingUtil.java | 2 +- .../cassandra/distributed/impl/Versions.java | 190 ----------- .../mock/nodetool/InternalNodeProbe.java | 36 +- .../mock/nodetool/InternalNodeProbeFactory.java | 11 +- .../cassandra/distributed/test/BootstrapTest.java | 50 +-- .../test/DistributedReadWritePathTest.java | 300 ----------------- .../distributed/test/DistributedTestBase.java | 166 --------- .../distributed/test/GossipSettlesTest.java | 16 +- .../cassandra/distributed/test/GossipTest.java | 4 +- .../distributed/test/MessageFiltersTest.java | 87 +++-- .../distributed/test/MessageForwardingTest.java | 10 +- .../distributed/test/NativeProtocolTest.java | 49 +-- .../distributed/test/NetworkTopologyTest.java | 40 ++- .../cassandra/distributed/test/NodeToolTest.java | 2 +- .../distributed/test/ResourceLeakTest.java | 12 +- .../SharedClusterTestBase.java} | 38 ++- .../distributed/test/SimpleReadWriteTest.java | 276 +++++++++++++++ .../cassandra/distributed/test/TestBaseImpl.java | 47 +++ .../upgrade/CompactStorage2to3UpgradeTest.java | 33 +- .../upgrade/MixedModeReadRepairTest.java | 8 +- .../cassandra/distributed/upgrade/UpgradeTest.java | 57 ++-- .../distributed/upgrade/UpgradeTestBase.java | 21 +- .../apache/cassandra/LogbackStatusListener.java | 2 +- 53 files changed, 1167 insertions(+), 2225 deletions(-) diff --cc build.xml index 400a707,2527883..161150a --- a/build.xml +++ b/build.xml @@@ -561,13 -524,8 +565,15 @@@ artifactId="cassandra-parent" version="${version}"/> <dependency groupId="junit" artifactId="junit"/> + <dependency groupId="org.mockito" artifactId="mockito-core" /> - <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/> ++ <dependency groupId="org.apache.cassandra" artifactId="dtest-api" /> + <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"> + <exclusion groupId="io.netty" artifactId="netty-buffer"/> + <exclusion groupId="io.netty" artifactId="netty-codec"/> + <exclusion groupId="io.netty" artifactId="netty-handler"/> + <exclusion groupId="io.netty" artifactId="netty-transport"/> + </dependency> + <dependency groupId="io.netty" artifactId="netty-all"/> <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/> <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/> <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/> diff --cc src/java/org/apache/cassandra/net/MessageOut.java index 4f41ee5,09ff63b..1d1dd49 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@@ -102,6 -108,6 +102,28 @@@ public class MessageOut<T public void serialize(DataOutputPlus out, int version) throws IOException { ++ serialize(out, from, verb, parameters, payload, serializer, version); ++ } ++ ++ public static <T> void serialize(DataOutputPlus out, ++ InetAddress from, ++ MessagingService.Verb verb, ++ Map<String, byte[]> parameters, ++ T payload, ++ int version) throws IOException ++ { ++ IVersionedSerializer<T> serializer = (IVersionedSerializer<T>) MessagingService.instance().verbSerializers.get(verb); ++ serialize(out, from, verb, parameters, payload, serializer, version); ++ } ++ ++ public static <T> void serialize(DataOutputPlus out, ++ InetAddress from, ++ MessagingService.Verb verb, ++ Map<String, byte[]> parameters, ++ T payload, ++ IVersionedSerializer<T> serializer, ++ int version) throws IOException ++ { CompactEndpointSerializationHelper.serialize(from, out); out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(verb, version).ordinal()); @@@ -113,21 -119,11 +135,21 @@@ out.write(entry.getValue()); } - long longSize = payloadSize(version); - assert longSize <= Integer.MAX_VALUE; // larger values are supported in sstables but not messages - out.writeInt((int) longSize); -- if (payload != null) - serializer.serialize(payload, out, version); ++ if (payload != null && serializer != MessagingService.CallbackDeterminedSerializer.instance) + { - try(DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get()) ++ try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get()) + { + serializer.serialize(payload, dob, version); + + int size = dob.getLength(); + out.writeInt(size); + out.write(dob.getData(), 0, size); + } + } + else + { + out.writeInt(0); + } } public int serializedSize(int version) diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java index 3b81ba3,2425821..2c4e409 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@@ -861,11 -819,10 +868,15 @@@ public class NodeProbe implements AutoC return spProxy; } + public MessagingServiceMBean getMessagingServiceProxy() + { + return msProxy; + } + + public StorageServiceMBean getStorageService() { + return ssProxy; + } + public GossiperMBean getGossProxy() { return gossProxy; diff --cc test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java index 97a3aa8,e2ebef0..d49679d --- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java @@@ -62,8 -63,8 +64,8 @@@ public class Coordinator implements ICo return instance.async(() -> { try { - Tracing.instance.newSession(sessionId); + Tracing.instance.newSession(sessionId, Collections.emptyMap()); - return executeInternal(query, consistencyLevelOrigin, boundValues); + return executeInternal(query, consistencyLevelOrigin, boundValues).toObjectArrays(); } finally { @@@ -90,16 -95,22 +96,23 @@@ Integer.MAX_VALUE, null, null, - Server.CURRENT_VERSION)); + ProtocolVersion.CURRENT), + System.nanoTime()); if (res != null && res.kind == ResultMessage.Kind.ROWS) - return RowUtil.toObjects((ResultMessage.Rows) res); + { + ResultMessage.Rows rows = (ResultMessage.Rows) res; + String[] names = rows.result.metadata.names.stream().map(c -> c.name.toString()).toArray(String[]::new); + Object[][] results = RowUtil.toObjects(rows); + return new QueryResult(names, results); + } else - return new Object[][]{}; + { + return QueryResult.EMPTY; + } } - public Object[][] executeWithTracing(UUID sessionId, String query, Enum<?> consistencyLevelOrigin, Object... boundValues) + public Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevelOrigin, Object... boundValues) { return IsolatedExecutor.waitOn(asyncExecuteWithTracing(sessionId, query, consistencyLevelOrigin, boundValues)); } diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index bf4889e,e8c45d8..759a636 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -71,7 -81,8 +82,6 @@@ import org.apache.cassandra.io.sstable. import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataOutputBuffer; - import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.IMessageSink; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@@ -82,13 -93,16 +92,15 @@@ import org.apache.cassandra.service.Cli import org.apache.cassandra.service.PendingRangeCalculatorService; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; + import org.apache.cassandra.service.StorageServiceMBean; import org.apache.cassandra.streaming.StreamCoordinator; +import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.tools.NodeTool; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.ResultMessage; + import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.Throwables; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Ref; @@@ -248,7 -286,57 +284,33 @@@ public class Instance extends IsolatedE long timestamp = System.currentTimeMillis(); out.writeInt((int) timestamp); messageOut.serialize(out, version); - return new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from); + return new MessageImpl(messageOut.verb.ordinal(), out.toByteArray(), id, version, from); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + - public static IMessage serializeMessage(MessageIn<?> messageIn, int id, InetSocketAddress from, InetSocketAddress to) ++ public static IMessage serializeMessage(MessageIn messageIn, int id, InetSocketAddress from, InetSocketAddress to) + { + try (DataOutputBuffer out = new DataOutputBuffer(1024)) + { - // Serialize header + int version = MessagingService.instance().getVersion(to.getAddress()); + + out.writeInt(MessagingService.PROTOCOL_MAGIC); + out.writeInt(id); + long timestamp = System.currentTimeMillis(); + out.writeInt((int) timestamp); + - // Serialize the message itself - IVersionedSerializer serializer = MessagingService.instance().verbSerializers.get(messageIn.verb); - CompactEndpointSerializationHelper.serialize(from.getAddress(), out); - - out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(messageIn.verb, version).ordinal()); - out.writeInt(messageIn.parameters.size()); - for (Map.Entry<String, byte[]> entry : messageIn.parameters.entrySet()) - { - out.writeUTF(entry.getKey()); - out.writeInt(entry.getValue().length); - out.write(entry.getValue()); - } - - if (messageIn.payload != null && serializer != MessagingService.CallbackDeterminedSerializer.instance) - { - try (DataOutputBuffer dob = new DataOutputBuffer()) - { - serializer.serialize(messageIn.payload, dob, version); - - int size = dob.getLength(); - out.writeInt(size); - out.write(dob.getData(), 0, size); - } - } - else - { - out.writeInt(0); - } - ++ MessageOut.serialize(out, ++ from.getAddress(), ++ messageIn.verb, ++ messageIn.parameters, ++ messageIn.payload, ++ version); + + return new MessageImpl(messageIn.verb.ordinal(), out.toByteArray(), id, version, from); } catch (IOException e) { @@@ -327,12 -416,19 +390,12 @@@ id = Integer.parseInt(input.readUTF()); else id = input.readInt(); - if (imessage.id() != id) - throw new IllegalStateException(String.format("Message id mismatch: %d != %d", imessage.id(), id)); - - // make sure to readInt, even if cross_node_to is not enabled - int partial = input.readInt(); - - return Pair.create(MessageIn.read(input, version, id), partial); - //long currentTime = ApproximateTime.currentTimeMillis(); - //return MessageIn.read(input, version, id, MessageIn.readConstructionTime(imessage.from().getAddress(), input, currentTime)); + long currentTime = ApproximateTime.currentTimeMillis(); - return MessageIn.read(input, version, id, MessageIn.readConstructionTime(imessage.from().address, input, currentTime)); ++ return MessageIn.read(input, version, id, MessageIn.readConstructionTime(imessage.from().getAddress(), input, currentTime)); } - catch (IOException e) + catch (Throwable t) { - throw new RuntimeException(); + throw new RuntimeException(t); } } @@@ -356,8 -442,31 +419,8 @@@ } catch (Throwable t) { - throw new RuntimeException("Exception occurred on node " + broadcastAddressAndPort(), t); + throw new RuntimeException("Exception occurred on node " + broadcastAddress(), t); } - - MessageIn<Object> message = deserialized.left; - int partial = deserialized.right; - - long timestamp = System.currentTimeMillis(); - boolean isCrossNodeTimestamp = false; - - if (DatabaseDescriptor.hasCrossNodeTimeout()) - { - long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2); - isCrossNodeTimestamp = (timestamp != crossNodeTimestamp); - timestamp = crossNodeTimestamp; - } - - if (message == null) - { - // callback expired; nothing to do - return; - } - if (message.version <= MessagingService.current_version) - { - MessagingService.instance().receive(message, imessage.id(), timestamp, isCrossNodeTimestamp); - } }).run(); } @@@ -404,10 -507,10 +461,10 @@@ { mkdirs(); - assert config.networkTopology().contains(config.broadcastAddressAndPort()); + assert config.networkTopology().contains(config.broadcastAddress()); DistributedTestSnitch.assign(config.networkTopology()); - DatabaseDescriptor.setDaemonInitialized(); + DatabaseDescriptor.daemonInitialization(); DatabaseDescriptor.createAllDirectories(); // We need to persist this as soon as possible after startup checks. @@@ -530,16 -634,15 +588,16 @@@ UUID hostId = hostIds.get(i); Token token = tokens.get(i); Gossiper.runInGossipStageBlocking(() -> { - Gossiper.instance.initializeNodeUnsafe(ep.address, hostId, 1); - Gossiper.instance.injectApplicationState(ep.address, + Gossiper.instance.initializeNodeUnsafe(ep.getAddress(), hostId, 1); + Gossiper.instance.injectApplicationState(ep.getAddress(), ApplicationState.TOKENS, new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(token))); - storageService.onChange(ep.address, + storageService.onChange(ep.getAddress(), ApplicationState.STATUS, new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(token))); - Gossiper.instance.realMarkAlive(ep.address, Gossiper.instance.getEndpointStateForEndpoint(ep.address)); + Gossiper.instance.realMarkAlive(ep.getAddress(), Gossiper.instance.getEndpointStateForEndpoint(ep.getAddress())); }); + int messagingVersion = cluster.get(ep).isShutdown() ? MessagingService.current_version : Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion()); diff --cc test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java index e774633,1904aa7..e7734da --- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java +++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbeFactory.java @@@ -21,12 -21,19 +21,19 @@@ package org.apache.cassandra.distribute import java.io.IOException; import org.apache.cassandra.tools.NodeProbe; -import org.apache.cassandra.tools.INodeProbeFactory; +import org.apache.cassandra.tools.NodeProbeFactory; -public class InternalNodeProbeFactory implements INodeProbeFactory +public class InternalNodeProbeFactory extends NodeProbeFactory { + private final boolean withNotifications; + + public InternalNodeProbeFactory(boolean withNotifications) + { + this.withNotifications = withNotifications; + } + public NodeProbe create(String host, int port) throws IOException { - return new InternalNodeProbe(); + return new InternalNodeProbe(withNotifications); } public NodeProbe create(String host, int port, String username, String password) throws IOException { diff --cc test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java index ca41030,f4398da..bb8d7fb --- a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java @@@ -18,8 -18,8 +18,9 @@@ package org.apache.cassandra.distributed.test; + import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@@ -45,6 -61,6 +61,7 @@@ public class MessageFiltersTest extend int VERB1 = MessagingService.Verb.READ.ordinal(); int VERB2 = MessagingService.Verb.REQUEST_RESPONSE.ordinal(); int VERB3 = MessagingService.Verb.READ_REPAIR.ordinal(); ++ int i1 = 1; int i2 = 2; int i3 = 3; @@@ -52,21 -68,22 +69,22 @@@ String MSG2 = "msg2"; MessageFilters filters = new MessageFilters(); - MessageFilters.Filter filter = filters.allVerbs().from(1).drop(); - Permit permit = inbound ? (from, to, msg) -> filters.permitInbound(from, to, msg) : (from, to, msg) -> filters.permitOutbound(from, to, msg); ++ Permit permit = inbound ? filters::permitInbound : filters::permitOutbound; - Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); - Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1))); - Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1))); - Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); + IMessageFilters.Filter filter = filters.allVerbs().inbound(inbound).from(1).drop(); + Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1))); + Assert.assertFalse(permit.test(i1, i2, msg(VERB2, MSG1))); + Assert.assertFalse(permit.test(i1, i2, msg(VERB3, MSG1))); + Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1))); filter.off(); - Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i1, i2, msg(VERB1, MSG1))); filters.reset(); - filters.verbs(VERB1).from(1).to(2).drop(); - Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1))); - Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1))); - Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1))); - Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1))); + filters.verbs(VERB1).inbound(inbound).from(1).to(2).drop(); + Assert.assertFalse(permit.test(i1, i2, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i1, i2, msg(VERB2, MSG1))); + Assert.assertTrue(permit.test(i2, i1, msg(VERB1, MSG1))); + Assert.assertTrue(permit.test(i2, i3, msg(VERB2, MSG1))); filters.reset(); AtomicInteger counter = new AtomicInteger(); @@@ -114,10 -131,10 +132,9 @@@ public byte[] bytes() { return msg.getBytes(); } public int id() { return 0; } public int version() { return 0; } - public InetAddressAndPort from() { return null; } + public InetSocketAddress from() { return null; } }; } -- @Test public void testFilters() throws Throwable { diff --cc test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java index fb5f758,808b95c..1c4850a --- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java @@@ -34,12 -33,11 +33,11 @@@ import org.junit.Ignore import org.junit.Test; import com.sun.management.HotSpotDiagnosticMXBean; - import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.distributed.Cluster; - import org.apache.cassandra.distributed.impl.InstanceConfig; + import org.apache.cassandra.distributed.api.ConsistencyLevel; + import org.apache.cassandra.distributed.api.IInstanceConfig; -import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.gms.Gossiper; - import org.apache.cassandra.service.CassandraDaemon; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.SigarLibrary; diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java index c27c68f,81e580d..5970992 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java @@@ -36,22 -36,22 +36,22 @@@ public class UpgradeTest extends Upgrad public void upgradeTest() throws Throwable { new TestCase() - .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X) - .setup((cluster) -> { - cluster.schemaChange("CREATE TABLE " + DistributedTestBase.KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - .upgrade(Versions.Major.v22, Versions.Major.v30) ++ .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X) + .setup((cluster) -> { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - cluster.get(1).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)"); - cluster.get(3).executeInternal("INSERT INTO " + DistributedTestBase.KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)"); - }) - .runAfterClusterUpgrade((cluster) -> { - DistributedTestBase.assertRows(cluster.coordinator(1).execute("SELECT * FROM " + DistributedTestBase.KEYSPACE + ".tbl WHERE pk = ?", - ConsistencyLevel.ALL, - 1), - DistributedTestBase.row(1, 1, 1), - DistributedTestBase.row(1, 2, 2), - DistributedTestBase.row(1, 3, 3)); - }).run(); + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)"); + }) + .runAfterClusterUpgrade((cluster) -> { + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", + ConsistencyLevel.ALL, + 1), + row(1, 1, 1), + row(1, 2, 2), + row(1, 3, 3)); + }).run(); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
