Repository: cassandra Updated Branches: refs/heads/trunk 1e20d9513 -> 4cfaf855c
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java index 449a5dc..5dc8112 100644 --- a/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java +++ b/test/unit/org/apache/cassandra/repair/messages/RepairMessageSerializationsTest.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import com.google.common.collect.Lists; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -43,10 +44,14 @@ import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.NodePair; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.SessionSummary; +import org.apache.cassandra.streaming.StreamSummary; import org.apache.cassandra.utils.MerkleTrees; +import org.apache.cassandra.utils.UUIDGen; public class RepairMessageSerializationsTest { @@ -145,7 +150,7 @@ public class RepairMessageSerializationsTest InetAddress src = InetAddress.getByName("127.0.0.2"); InetAddress dst = InetAddress.getByName("127.0.0.3"); - SyncRequest msg = new SyncRequest(buildRepairJobDesc(), initiator, src, dst, buildTokenRanges()); + SyncRequest msg = new SyncRequest(buildRepairJobDesc(), initiator, src, dst, buildTokenRanges(), PreviewKind.NONE); serializeRoundTrip(msg, SyncRequest.serializer); } @@ -154,7 +159,12 @@ public class RepairMessageSerializationsTest { InetAddress src = InetAddress.getByName("127.0.0.2"); InetAddress dst = InetAddress.getByName("127.0.0.3"); - SyncComplete msg = new SyncComplete(buildRepairJobDesc(), new NodePair(src, dst), true); + List<SessionSummary> summaries = new ArrayList<>(); + summaries.add(new SessionSummary(src, dst, + Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 5, 100)), + Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 500, 10)) + )); + SyncComplete msg = new SyncComplete(buildRepairJobDesc(), new NodePair(src, dst), true, summaries); serializeRoundTrip(msg, SyncComplete.serializer); } @@ -162,7 +172,8 @@ public class RepairMessageSerializationsTest public void prepareMessage() throws IOException { PrepareMessage msg = new PrepareMessage(UUID.randomUUID(), new ArrayList<TableId>() {{add(TableId.generate());}}, - buildTokenRanges(), true, 100000L, false); + buildTokenRanges(), true, 100000L, false, + PreviewKind.NONE); serializeRoundTrip(msg, PrepareMessage.serializer); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java index 8f8fe6d..9d5ccf4 100644 --- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -20,7 +20,6 @@ package org.apache.cassandra.service; import java.net.InetAddress; import java.util.*; -import java.util.concurrent.ExecutionException; import com.google.common.collect.Sets; import org.junit.Before; @@ -40,6 +39,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Refs; @@ -230,11 +230,19 @@ public class ActiveRepairServiceTest ColumnFamilyStore store = prepareColumnFamilyStore(); UUID prsId = UUID.randomUUID(); Set<SSTableReader> original = Sets.newHashSet(store.select(View.select(SSTableSet.CANONICAL, (s) -> !s.isRepaired())).sstables); - ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); + ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), + Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), + store.getPartitioner().getMinimumToken())), + true, System.currentTimeMillis(), true, PreviewKind.NONE); ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id, prsId); UUID prsId2 = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), store.getPartitioner().getMinimumToken())), true, System.currentTimeMillis(), true); + ActiveRepairService.instance.registerParentRepairSession(prsId2, FBUtilities.getBroadcastAddress(), + Collections.singletonList(store), + Collections.singleton(new Range<>(store.getPartitioner().getMinimumToken(), + store.getPartitioner().getMinimumToken())), + true, System.currentTimeMillis(), + true, PreviewKind.NONE); createSSTables(store, 2); ActiveRepairService.instance.getParentRepairSession(prsId).maybeSnapshot(store.metadata.id, prsId); try (Refs<SSTableReader> refs = store.getSnapshotSSTableReaders(prsId.toString())) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/service/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/SerializationsTest.java b/test/unit/org/apache/cassandra/service/SerializationsTest.java index 66fdeec..a63dc69 100644 --- a/test/unit/org/apache/cassandra/service/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/service/SerializationsTest.java @@ -20,10 +20,13 @@ package org.apache.cassandra.service; import java.io.IOException; import java.net.InetAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.UUID; +import com.google.common.collect.Lists; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -43,8 +46,13 @@ import org.apache.cassandra.repair.NodePair; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.Validator; import org.apache.cassandra.repair.messages.*; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.SessionSummary; +import org.apache.cassandra.streaming.StreamSummary; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTrees; +import org.apache.cassandra.utils.UUIDGen; public class SerializationsTest extends AbstractSerializationsTester { @@ -115,7 +123,7 @@ public class SerializationsTest extends AbstractSerializationsTester // empty validation mt.addMerkleTree((int) Math.pow(2, 15), FULL_RANGE); - Validator v0 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1); + Validator v0 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1, PreviewKind.NONE); ValidationComplete c0 = new ValidationComplete(DESC, mt); // validation with a tree @@ -123,7 +131,7 @@ public class SerializationsTest extends AbstractSerializationsTester mt.addMerkleTree(Integer.MAX_VALUE, FULL_RANGE); for (int i = 0; i < 10; i++) mt.split(p.getRandomToken()); - Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1); + Validator v1 = new Validator(DESC, FBUtilities.getBroadcastAddress(), -1, PreviewKind.NONE); ValidationComplete c1 = new ValidationComplete(DESC, mt); // validation failed @@ -175,8 +183,8 @@ public class SerializationsTest extends AbstractSerializationsTester InetAddress local = InetAddress.getByAddress(new byte[]{127, 0, 0, 1}); InetAddress src = InetAddress.getByAddress(new byte[]{127, 0, 0, 2}); InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3}); - SyncRequest message = new SyncRequest(DESC, local, src, dest, Collections.singleton(FULL_RANGE)); + SyncRequest message = new SyncRequest(DESC, local, src, dest, Collections.singleton(FULL_RANGE), PreviewKind.NONE); testRepairMessageWrite("service.SyncRequest.bin", message); } @@ -209,9 +217,14 @@ public class SerializationsTest extends AbstractSerializationsTester InetAddress src = InetAddress.getByAddress(new byte[]{127, 0, 0, 2}); InetAddress dest = InetAddress.getByAddress(new byte[]{127, 0, 0, 3}); // sync success - SyncComplete success = new SyncComplete(DESC, src, dest, true); + List<SessionSummary> summaries = new ArrayList<>(); + summaries.add(new SessionSummary(src, dest, + Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 5, 100)), + Lists.newArrayList(new StreamSummary(TableId.fromUUID(UUIDGen.getTimeUUID()), 500, 10)) + )); + SyncComplete success = new SyncComplete(DESC, src, dest, true, summaries); // sync fail - SyncComplete fail = new SyncComplete(DESC, src, dest, false); + SyncComplete fail = new SyncComplete(DESC, src, dest, false, Collections.emptyList()); testRepairMessageWrite("service.SyncComplete.bin", success, fail); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java index 84053d4..6abc2a2 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamSessionTest.java @@ -97,7 +97,8 @@ public class StreamSessionTest Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken())); List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(ranges, Lists.newArrayList(cfs), - pendingRepair); + pendingRepair, + PreviewKind.NONE); Set<SSTableReader> sstables = new HashSet<>(); for (StreamSession.SSTableStreamingSections section: sections) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 57d40e9..21c8375 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -74,7 +74,7 @@ public class StreamTransferTaskTest public void testScheduleTimeout() throws Exception { InetAddress peer = FBUtilities.getBroadcastAddress(); - StreamSession session = new StreamSession(peer, peer, null, 0, true, null); + StreamSession session = new StreamSession(peer, peer, null, 0, true, null, PreviewKind.NONE); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); // create two sstables @@ -120,9 +120,9 @@ public class StreamTransferTaskTest public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception { InetAddress peer = FBUtilities.getBroadcastAddress(); - StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, null, false, null); + StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, null, false, null, PreviewKind.NONE); StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator); - StreamSession session = new StreamSession(peer, peer, null, 0, true, null); + StreamSession session = new StreamSession(peer, peer, null, 0, true, null, PreviewKind.NONE); session.init(future); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);