http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/messages/RepairOption.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java index 3b13cd8..6c8ff9d 100644 --- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -28,6 +28,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.utils.FBUtilities; @@ -47,6 +48,7 @@ public class RepairOption public static final String TRACE_KEY = "trace"; public static final String SUB_RANGE_REPAIR_KEY = "sub_range_repair"; public static final String PULL_REPAIR_KEY = "pullRepair"; + public static final String PREVIEW = "previewKind"; // we don't want to push nodes too much for repair public static final int MAX_JOB_THREADS = 4; @@ -136,6 +138,7 @@ public class RepairOption RepairParallelism parallelism = RepairParallelism.fromName(options.get(PARALLELISM_KEY)); boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY)); boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY)); + PreviewKind previewKind = PreviewKind.valueOf(options.getOrDefault(PREVIEW, PreviewKind.NONE.toString())); boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY)); boolean pullRepair = Boolean.parseBoolean(options.get(PULL_REPAIR_KEY)); @@ -171,7 +174,7 @@ public class RepairOption } } - RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair); + RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, previewKind); // data centers String dataCentersStr = options.get(DATACENTERS_KEY); @@ -252,13 +255,14 @@ public class RepairOption private final int jobThreads; private final boolean isSubrangeRepair; private final boolean pullRepair; + private final PreviewKind previewKind; private final Collection<String> columnFamilies = new HashSet<>(); private final Collection<String> dataCenters = new HashSet<>(); private final Collection<String> hosts = new HashSet<>(); private final Collection<Range<Token>> ranges = new HashSet<>(); - public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair) + public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, PreviewKind previewKind) { if (FBUtilities.isWindows && (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) && @@ -277,6 +281,7 @@ public class RepairOption this.ranges.addAll(ranges); this.isSubrangeRepair = isSubrangeRepair; this.pullRepair = pullRepair; + this.previewKind = previewKind; } public RepairParallelism getParallelism() @@ -339,6 +344,16 @@ public class RepairOption return isSubrangeRepair; } + public PreviewKind getPreviewKind() + { + return previewKind; + } + + public boolean isPreview() + { + return previewKind.isPreview(); + } + public boolean isInLocalDCOnly() { return dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()); } @@ -347,16 +362,17 @@ public class RepairOption public String toString() { return "repair options (" + - "parallelism: " + parallelism + - ", primary range: " + primaryRange + - ", incremental: " + incremental + - ", job threads: " + jobThreads + - ", ColumnFamilies: " + columnFamilies + - ", dataCenters: " + dataCenters + - ", hosts: " + hosts + - ", # of ranges: " + ranges.size() + - ", pull repair: " + pullRepair + - ')'; + "parallelism: " + parallelism + + ", primary range: " + primaryRange + + ", incremental: " + incremental + + ", job threads: " + jobThreads + + ", ColumnFamilies: " + columnFamilies + + ", dataCenters: " + dataCenters + + ", hosts: " + hosts + + ", previewKind: " + previewKind + + ", # of ranges: " + ranges.size() + + ", pull repair: " + pullRepair + + ')'; } public Map<String, String> asMap() @@ -373,6 +389,7 @@ public class RepairOption options.put(TRACE_KEY, Boolean.toString(trace)); options.put(RANGES_KEY, Joiner.on(",").join(ranges)); options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair)); + options.put(PREVIEW, previewKind.toString()); return options; } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/messages/SyncComplete.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java index 178e710..7b68daf 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java @@ -19,6 +19,8 @@ package org.apache.cassandra.repair.messages; import java.io.IOException; import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import org.apache.cassandra.db.TypeSizes; @@ -26,6 +28,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.repair.NodePair; import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.streaming.SessionSummary; /** * @@ -40,16 +43,20 @@ public class SyncComplete extends RepairMessage /** true if sync success, false otherwise */ public final boolean success; - public SyncComplete(RepairJobDesc desc, NodePair nodes, boolean success) + public final List<SessionSummary> summaries; + + public SyncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries) { super(Type.SYNC_COMPLETE, desc); this.nodes = nodes; this.success = success; + this.summaries = summaries; } - public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress endpoint2, boolean success) + public SyncComplete(RepairJobDesc desc, InetAddress endpoint1, InetAddress endpoint2, boolean success, List<SessionSummary> summaries) { super(Type.SYNC_COMPLETE, desc); + this.summaries = summaries; this.nodes = new NodePair(endpoint1, endpoint2); this.success = success; } @@ -63,13 +70,14 @@ public class SyncComplete extends RepairMessage return messageType == other.messageType && desc.equals(other.desc) && success == other.success && - nodes.equals(other.nodes); + nodes.equals(other.nodes) && + summaries.equals(other.summaries); } @Override public int hashCode() { - return Objects.hash(messageType, desc, success, nodes); + return Objects.hash(messageType, desc, success, nodes, summaries); } private static class SyncCompleteSerializer implements MessageSerializer<SyncComplete> @@ -79,13 +87,28 @@ public class SyncComplete extends RepairMessage RepairJobDesc.serializer.serialize(message.desc, out, version); NodePair.serializer.serialize(message.nodes, out, version); out.writeBoolean(message.success); + + out.writeInt(message.summaries.size()); + for (SessionSummary summary: message.summaries) + { + SessionSummary.serializer.serialize(summary, out, version); + } } public SyncComplete deserialize(DataInputPlus in, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); NodePair nodes = NodePair.serializer.deserialize(in, version); - return new SyncComplete(desc, nodes, in.readBoolean()); + boolean success = in.readBoolean(); + + int numSummaries = in.readInt(); + List<SessionSummary> summaries = new ArrayList<>(numSummaries); + for (int i=0; i<numSummaries; i++) + { + summaries.add(SessionSummary.serializer.deserialize(in, version)); + } + + return new SyncComplete(desc, nodes, success, summaries); } public long serializedSize(SyncComplete message, int version) @@ -93,6 +116,13 @@ public class SyncComplete extends RepairMessage long size = RepairJobDesc.serializer.serializedSize(message.desc, version); size += NodePair.serializer.serializedSize(message.nodes, version); size += TypeSizes.sizeof(message.success); + + size += TypeSizes.sizeof(message.summaries.size()); + for (SessionSummary summary: message.summaries) + { + size += SessionSummary.serializer.serializedSize(summary, version); + } + return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/repair/messages/SyncRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java index e31cc6c..01601e2 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncRequest.java @@ -33,6 +33,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.streaming.PreviewKind; /** * Body part of SYNC_REQUEST repair message. @@ -48,14 +49,16 @@ public class SyncRequest extends RepairMessage public final InetAddress src; public final InetAddress dst; public final Collection<Range<Token>> ranges; + public final PreviewKind previewKind; - public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges) + public SyncRequest(RepairJobDesc desc, InetAddress initiator, InetAddress src, InetAddress dst, Collection<Range<Token>> ranges, PreviewKind previewKind) { super(Type.SYNC_REQUEST, desc); this.initiator = initiator; this.src = src; this.dst = dst; this.ranges = ranges; + this.previewKind = previewKind; } @Override @@ -69,13 +72,14 @@ public class SyncRequest extends RepairMessage initiator.equals(req.initiator) && src.equals(req.src) && dst.equals(req.dst) && - ranges.equals(req.ranges); + ranges.equals(req.ranges) && + previewKind == req.previewKind; } @Override public int hashCode() { - return Objects.hash(messageType, desc, initiator, src, dst, ranges); + return Objects.hash(messageType, desc, initiator, src, dst, ranges, previewKind); } public static class SyncRequestSerializer implements MessageSerializer<SyncRequest> @@ -92,6 +96,7 @@ public class SyncRequest extends RepairMessage MessagingService.validatePartitioner(range); AbstractBounds.tokenSerializer.serialize(range, out, version); } + out.writeInt(message.previewKind.getSerializationVal()); } public SyncRequest deserialize(DataInputPlus in, int version) throws IOException @@ -104,7 +109,8 @@ public class SyncRequest extends RepairMessage List<Range<Token>> ranges = new ArrayList<>(rangesCount); for (int i = 0; i < rangesCount; ++i) ranges.add((Range<Token>) AbstractBounds.tokenSerializer.deserialize(in, MessagingService.globalPartitioner(), version)); - return new SyncRequest(desc, owner, src, dst, ranges); + PreviewKind previewKind = PreviewKind.deserialize(in.readInt()); + return new SyncRequest(desc, owner, src, dst, ranges, previewKind); } public long serializedSize(SyncRequest message, int version) @@ -114,6 +120,7 @@ public class SyncRequest extends RepairMessage size += TypeSizes.sizeof(message.ranges.size()); for (Range<Token> range : message.ranges) size += AbstractBounds.tokenSerializer.serializedSize(range, version); + size += TypeSizes.sizeof(message.previewKind.getSerializationVal()); return size; } } @@ -126,6 +133,7 @@ public class SyncRequest extends RepairMessage ", src=" + src + ", dst=" + dst + ", ranges=" + ranges + + ", previewKind=" + previewKind + "} " + super.toString(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index fd98b37..aadf7c1 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -61,6 +61,7 @@ import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.RepairSession; @@ -167,6 +168,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai Set<InetAddress> endpoints, boolean isConsistent, boolean pullRepair, + PreviewKind previewKind, ListeningExecutorService executor, String... cfnames) { @@ -176,7 +178,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai if (cfnames.length == 0) return null; - final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, cfnames); + final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, previewKind, cfnames); sessions.put(session.getId(), session); // register listeners @@ -319,7 +321,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai { // we only want repairedAt for incremental repairs, for non incremental repairs, UNREPAIRED_SSTABLE will preserve repairedAt on streamed sstables long repairedAt = options.isIncremental() ? Clock.instance.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE; - registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal()); + registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind()); final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size()); final AtomicBoolean status = new AtomicBoolean(true); final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>()); @@ -351,7 +353,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai { if (FailureDetector.instance.isAlive(neighbour)) { - PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal()); + PrepareMessage message = new PrepareMessage(parentRepairSession, tableIds, options.getRanges(), options.isIncremental(), repairedAt, options.isGlobal(), options.getPreviewKind()); MessageOut<RepairMessage> msg = message.createMessage(); MessagingService.instance().sendRR(msg, neighbour, callback, DatabaseDescriptor.getRpcTimeout(), true); } @@ -386,7 +388,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai throw new RuntimeException(errorMsg); } - public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal) + public void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind) { assert isIncremental || repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE; if (!registeredForEndpointChanges) @@ -396,7 +398,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai registeredForEndpointChanges = true; } - parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, repairedAt, isGlobal)); + parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, isIncremental, repairedAt, isGlobal, previewKind)); } public ParentRepairSession getParentRepairSession(UUID parentSessionId) @@ -444,7 +446,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai case SYNC_COMPLETE: // one of replica is synced. SyncComplete sync = (SyncComplete) message; - session.syncComplete(desc, sync.nodes, sync.success); + session.syncComplete(desc, sync.nodes, sync.success, sync.summaries); break; default: break; @@ -464,8 +466,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai public final boolean isGlobal; public final long repairedAt; public final InetAddress coordinator; + public final PreviewKind previewKind; - public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal) + public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, boolean isIncremental, long repairedAt, boolean isGlobal, PreviewKind previewKind) { this.coordinator = coordinator; for (ColumnFamilyStore cfs : columnFamilyStores) @@ -476,6 +479,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai this.repairedAt = repairedAt; this.isIncremental = isIncremental; this.isGlobal = isGlobal; + this.previewKind = previewKind; + } + + public boolean isPreview() + { + return previewKind != PreviewKind.NONE; + } + + public Predicate<SSTableReader> getPreviewPredicate() + { + switch (previewKind) + { + case ALL: + return (s) -> true; + case REPAIRED: + return (s) -> s.isRepaired(); + case UNREPAIRED: + return (s) -> !s.isRepaired(); + default: + throw new RuntimeException("Can't get preview predicate for preview kind " + previewKind); + } } public synchronized void maybeSnapshot(TableId tableId, UUID parentSessionId) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 86340a5..5f734c9 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -64,10 +64,12 @@ public class ConnectionHandler private IncomingMessageHandler incoming; private OutgoingMessageHandler outgoing; + private final boolean isPreview; - ConnectionHandler(StreamSession session, int incomingSocketTimeout) + ConnectionHandler(StreamSession session, int incomingSocketTimeout, boolean isPreview) { this.session = session; + this.isPreview = isPreview; this.incoming = new IncomingMessageHandler(session, incomingSocketTimeout); this.outgoing = new OutgoingMessageHandler(session); } @@ -142,6 +144,9 @@ public class ConnectionHandler if (outgoing.isClosed()) throw new RuntimeException("Outgoing stream handler has been closed"); + if (message.type == StreamMessage.Type.FILE && isPreview) + throw new RuntimeException("Cannot send file messages for preview streaming sessions"); + outgoing.enqueue(message); } @@ -191,14 +196,14 @@ public class ConnectionHandler @SuppressWarnings("resource") private void sendInitMessage() throws IOException { - StreamInitMessage message = new StreamInitMessage( - FBUtilities.getBroadcastAddress(), - session.sessionIndex(), - session.planId(), - session.streamOperation(), - !isOutgoingHandler, - session.keepSSTableLevel(), - session.getPendingRepair()); + StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(), + session.sessionIndex(), + session.planId(), + session.streamOperation(), + !isOutgoingHandler, + session.keepSSTableLevel(), + session.getPendingRepair(), + session.getPreviewKind()); ByteBuffer messageBuf = message.createMessage(false, protocolVersion); DataOutputStreamPlus out = getWriteChannel(socket); out.write(messageBuf); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/PreviewKind.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/PreviewKind.java b/src/java/org/apache/cassandra/streaming/PreviewKind.java new file mode 100644 index 0000000..3b4d2a0 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/PreviewKind.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + + +import java.util.UUID; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; + +import org.apache.cassandra.io.sstable.format.SSTableReader; + +public enum PreviewKind +{ + NONE(0, null), + ALL(1, Predicates.alwaysTrue()), + UNREPAIRED(2, Predicates.not(SSTableReader::isRepaired)), + REPAIRED(3, SSTableReader::isRepaired); + + private final int serializationVal; + private final Predicate<SSTableReader> streamingPredicate; + + PreviewKind(int serializationVal, Predicate<SSTableReader> streamingPredicate) + { + assert ordinal() == serializationVal; + this.serializationVal = serializationVal; + this.streamingPredicate = streamingPredicate; + } + + public int getSerializationVal() + { + return serializationVal; + } + + public static PreviewKind deserialize(int serializationVal) + { + return values()[serializationVal]; + } + + public Predicate<SSTableReader> getStreamingPredicate() + { + return streamingPredicate; + } + + public boolean isPreview() + { + return this != NONE; + } + + public String logPrefix() + { + return isPreview() ? "preview repair" : "repair"; + } + + public String logPrefix(UUID sessionId) + { + return '[' + logPrefix() + " #" + sessionId.toString() + ']'; + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/SessionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java b/src/java/org/apache/cassandra/streaming/SessionInfo.java index 3bcb20c..1521614 100644 --- a/src/java/org/apache/cassandra/streaming/SessionInfo.java +++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java @@ -27,6 +27,8 @@ import com.google.common.base.Predicate; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import org.apache.cassandra.utils.FBUtilities; + /** * Stream session info. */ @@ -190,4 +192,9 @@ public final class SessionInfo implements Serializable }); return Iterables.size(completed); } + + public SessionSummary createSummary() + { + return new SessionSummary(FBUtilities.getBroadcastAddress(), peer, receivingSummaries, sendingSummaries); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/SessionSummary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/SessionSummary.java b/src/java/org/apache/cassandra/streaming/SessionSummary.java new file mode 100644 index 0000000..d52c2ca --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/SessionSummary.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.serializers.InetAddressSerializer; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class SessionSummary +{ + public final InetAddress coordinator; + public final InetAddress peer; + /** Immutable collection of receiving summaries */ + public final Collection<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final Collection<StreamSummary> sendingSummaries; + + public SessionSummary(InetAddress coordinator, InetAddress peer, + Collection<StreamSummary> receivingSummaries, + Collection<StreamSummary> sendingSummaries) + { + assert coordinator != null; + assert peer != null; + assert receivingSummaries != null; + assert sendingSummaries != null; + + this.coordinator = coordinator; + this.peer = peer; + this.receivingSummaries = receivingSummaries; + this.sendingSummaries = sendingSummaries; + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SessionSummary summary = (SessionSummary) o; + + if (!coordinator.equals(summary.coordinator)) return false; + if (!peer.equals(summary.peer)) return false; + if (!receivingSummaries.equals(summary.receivingSummaries)) return false; + return sendingSummaries.equals(summary.sendingSummaries); + } + + public int hashCode() + { + int result = coordinator.hashCode(); + result = 31 * result + peer.hashCode(); + result = 31 * result + receivingSummaries.hashCode(); + result = 31 * result + sendingSummaries.hashCode(); + return result; + } + + public static IVersionedSerializer<SessionSummary> serializer = new IVersionedSerializer<SessionSummary>() + { + public void serialize(SessionSummary summary, DataOutputPlus out, int version) throws IOException + { + ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator), out); + ByteBufferUtil.writeWithLength(InetAddressSerializer.instance.serialize(summary.peer), out); + + out.writeInt(summary.receivingSummaries.size()); + for (StreamSummary streamSummary: summary.receivingSummaries) + { + StreamSummary.serializer.serialize(streamSummary, out, version); + } + + out.writeInt(summary.sendingSummaries.size()); + for (StreamSummary streamSummary: summary.sendingSummaries) + { + StreamSummary.serializer.serialize(streamSummary, out, version); + } + } + + public SessionSummary deserialize(DataInputPlus in, int version) throws IOException + { + InetAddress coordinator = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in)); + InetAddress peer = InetAddressSerializer.instance.deserialize(ByteBufferUtil.readWithLength(in)); + + int numRcvd = in.readInt(); + List<StreamSummary> receivingSummaries = new ArrayList<>(numRcvd); + for (int i=0; i<numRcvd; i++) + { + receivingSummaries.add(StreamSummary.serializer.deserialize(in, version)); + } + + int numSent = in.readInt(); + List<StreamSummary> sendingSummaries = new ArrayList<>(numRcvd); + for (int i=0; i<numSent; i++) + { + sendingSummaries.add(StreamSummary.serializer.deserialize(in, version)); + } + + return new SessionSummary(coordinator, peer, receivingSummaries, sendingSummaries); + } + + public long serializedSize(SessionSummary summary, int version) + { + long size = 0; + size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.coordinator)); + size += ByteBufferUtil.serializedSizeWithLength(InetAddressSerializer.instance.serialize(summary.peer)); + + size += TypeSizes.sizeof(summary.receivingSummaries.size()); + for (StreamSummary streamSummary: summary.receivingSummaries) + { + size += StreamSummary.serializer.serializedSize(streamSummary, version); + } + size += TypeSizes.sizeof(summary.sendingSummaries.size()); + for (StreamSummary streamSummary: summary.sendingSummaries) + { + size += StreamSummary.serializer.serializedSize(streamSummary, version); + } + return size; + } + }; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 6aa34cd..9059f45 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -49,15 +49,17 @@ public class StreamCoordinator private final boolean keepSSTableLevel; private Iterator<StreamSession> sessionsToConnect = null; private final UUID pendingRepair; + private final PreviewKind previewKind; public StreamCoordinator(int connectionsPerHost, boolean keepSSTableLevel, StreamConnectionFactory factory, - boolean connectSequentially, UUID pendingRepair) + boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind) { this.connectionsPerHost = connectionsPerHost; this.factory = factory; this.keepSSTableLevel = keepSSTableLevel; this.connectSequentially = connectSequentially; this.pendingRepair = pendingRepair; + this.previewKind = previewKind; } public void setConnectionFactory(StreamConnectionFactory factory) @@ -293,7 +295,7 @@ public class StreamCoordinator // create if (streamSessions.size() < connectionsPerHost) { - StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, pendingRepair); + StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, pendingRepair, previewKind); streamSessions.put(++lastReturned, session); return session; } @@ -325,7 +327,7 @@ public class StreamCoordinator StreamSession session = streamSessions.get(id); if (session == null) { - session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, pendingRepair); + session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, pendingRepair, previewKind); streamSessions.put(id, session); } return session; http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index b5a6214..05a8d30 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -22,9 +22,10 @@ import java.util.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.UUIDGen; +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; + /** * {@link StreamPlan} is a helper class that builds StreamOperation of given configuration. * @@ -47,20 +48,20 @@ public class StreamPlan */ public StreamPlan(StreamOperation streamOperation) { - this(streamOperation, 1, false, false, null); + this(streamOperation, 1, false, false, NO_PENDING_REPAIR, PreviewKind.NONE); } public StreamPlan(StreamOperation streamOperation, boolean keepSSTableLevels, boolean connectSequentially) { - this(streamOperation, 1, keepSSTableLevels, connectSequentially, null); + this(streamOperation, 1, keepSSTableLevels, connectSequentially, NO_PENDING_REPAIR, PreviewKind.NONE); } public StreamPlan(StreamOperation streamOperation, int connectionsPerHost, boolean keepSSTableLevels, - boolean connectSequentially, UUID pendingRepair) + boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind) { this.streamOperation = streamOperation; this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, new DefaultConnectionFactory(), - connectSequentially, pendingRepair); + connectSequentially, pendingRepair, previewKind); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index b7e475a..34e7cc8 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,6 +93,8 @@ public class StreamReceiveTask extends StreamTask */ public synchronized void received(SSTableMultiWriter sstable) { + Preconditions.checkState(!session.isPreview(), "we should never receive sstables when previewing"); + if (done) { logger.warn("[{}] Received sstable {} on already finished stream received task. Aborting sstable.", session.planId(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 7845986..67d7d0d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -71,9 +71,9 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> set(getCurrentState()); } - private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, UUID pendingRepair) + private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, UUID pendingRepair, PreviewKind previewKind) { - this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory(), false, pendingRepair)); + this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, new DefaultConnectionFactory(), false, pendingRepair, previewKind)); } static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners, @@ -107,7 +107,8 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> boolean isForOutgoing, int version, boolean keepSSTableLevel, - UUID pendingRepair) throws IOException + UUID pendingRepair, + PreviewKind previewKind) throws IOException { StreamResultFuture future = StreamManager.instance.getReceivingStream(planId); if (future == null) @@ -115,7 +116,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, streamOperation.getDescription()); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. - future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair); + future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, pendingRepair, previewKind); StreamManager.instance.registerReceiving(future); } future.attachConnection(from, sessionIndex, connection, isForOutgoing, version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index adb8e79..5ca9938 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -165,6 +165,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber private final boolean keepSSTableLevel; private ScheduledFuture<?> keepAliveFuture = null; private final UUID pendingRepair; + private final PreviewKind previewKind; public static enum State { @@ -181,12 +182,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber /** * Create new streaming session with the peer. - * - * @param peer Address of streaming peer + * @param peer Address of streaming peer * @param connecting Actual connecting address * @param factory is used for establishing connection */ - public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair) + public StreamSession(InetAddress peer, InetAddress connecting, StreamConnectionFactory factory, int index, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind) { this.peer = peer; this.connecting = connecting; @@ -194,10 +194,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber this.factory = factory; this.handler = new ConnectionHandler(this, isKeepAliveSupported()? (int)TimeUnit.SECONDS.toMillis(2 * DatabaseDescriptor.getStreamingKeepAlivePeriod()) : - DatabaseDescriptor.getStreamingSocketTimeout()); + DatabaseDescriptor.getStreamingSocketTimeout(), previewKind.isPreview()); this.metrics = StreamingMetrics.get(connecting); this.keepSSTableLevel = keepSSTableLevel; this.pendingRepair = pendingRepair; + this.previewKind = previewKind; } public UUID planId() @@ -225,6 +226,16 @@ public class StreamSession implements IEndpointStateChangeSubscriber return pendingRepair; } + public boolean isPreview() + { + return previewKind.isPreview(); + } + + public PreviewKind getPreviewKind() + { + return previewKind; + } + public LifecycleTransaction getTransaction(TableId tableId) { assert receivers.containsKey(tableId); @@ -314,7 +325,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber flushSSTables(stores); List<Range<Token>> normalizedRanges = Range.normalize(ranges); - List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, pendingRepair); + List<SSTableStreamingSections> sections = getSSTableSectionsForRanges(normalizedRanges, stores, pendingRepair, previewKind); try { addTransferFiles(sections); @@ -356,7 +367,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber } @VisibleForTesting - public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair) + public static List<SSTableStreamingSections> getSSTableSectionsForRanges(Collection<Range<Token>> ranges, Collection<ColumnFamilyStore> stores, UUID pendingRepair, PreviewKind previewKind) { Refs<SSTableReader> refs = new Refs<>(); try @@ -370,7 +381,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber Set<SSTableReader> sstables = Sets.newHashSet(); SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL)); Predicate<SSTableReader> predicate; - if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR) + if (previewKind.isPreview()) + { + predicate = previewKind.getStreamingPredicate(); + } + else if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR) { predicate = Predicates.alwaysTrue(); } @@ -620,6 +635,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber handler.sendMessage(prepare); } + if (isPreview()) + { + completePreview(); + return; + } + // if there are files to stream if (!maybeCompleted()) startStreamingFiles(); @@ -650,6 +671,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber */ public void receive(IncomingFileMessage message) { + if (isPreview()) + { + throw new RuntimeException("Cannot receive files for preview session"); + } + long headerSize = message.header.size(); StreamingMetrics.totalIncomingBytes.inc(headerSize); metrics.incomingBytes.inc(headerSize); @@ -753,6 +779,22 @@ public class StreamSession implements IEndpointStateChangeSubscriber closeSession(State.FAILED); } + private void completePreview() + { + try + { + state(State.WAIT_COMPLETE); + closeSession(State.COMPLETE); + } + finally + { + // aborting the tasks here needs to be the last thing we do so that we + // accurately report expected streaming, but don't leak any sstable refs + for (StreamTask task : Iterables.concat(receivers.values(), transfers.values())) + task.abort(); + } + } + private boolean maybeCompleted() { boolean completed = receivers.isEmpty() && transfers.isEmpty(); @@ -803,6 +845,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber streamResult.handleSessionPrepared(this); state(State.STREAMING); + for (StreamTransferTask task : transfers.values()) { Collection<OutgoingFileMessage> messages = task.getFileMessages(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/StreamState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamState.java b/src/java/org/apache/cassandra/streaming/StreamState.java index 4ee3c8d..be37677 100644 --- a/src/java/org/apache/cassandra/streaming/StreamState.java +++ b/src/java/org/apache/cassandra/streaming/StreamState.java @@ -18,11 +18,13 @@ package org.apache.cassandra.streaming; import java.io.Serializable; +import java.util.List; import java.util.Set; import java.util.UUID; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; /** * Current snapshot of streaming progress. @@ -50,4 +52,9 @@ public class StreamState implements Serializable } }); } + + public List<SessionSummary> createSummaries() + { + return Lists.newArrayList(Iterables.transform(sessions, SessionInfo::createSummary)); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index 4619561..ceaa4d1 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -31,6 +31,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.UUIDSerializer; /** @@ -50,8 +51,9 @@ public class StreamInitMessage public final boolean isForOutgoing; public final boolean keepSSTableLevel; public final UUID pendingRepair; + public final PreviewKind previewKind; - public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean isForOutgoing, boolean keepSSTableLevel, UUID pendingRepair) + public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean isForOutgoing, boolean keepSSTableLevel, UUID pendingRepair, PreviewKind previewKind) { this.from = from; this.sessionIndex = sessionIndex; @@ -60,6 +62,7 @@ public class StreamInitMessage this.isForOutgoing = isForOutgoing; this.keepSSTableLevel = keepSSTableLevel; this.pendingRepair = pendingRepair; + this.previewKind = previewKind; } /** @@ -120,6 +123,7 @@ public class StreamInitMessage { UUIDSerializer.serializer.serialize(message.pendingRepair, out, MessagingService.current_version); } + out.writeInt(message.previewKind.getSerializationVal()); } public StreamInitMessage deserialize(DataInputPlus in, int version) throws IOException @@ -132,7 +136,8 @@ public class StreamInitMessage boolean keepSSTableLevel = in.readBoolean(); UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null; - return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, pendingRepair); + PreviewKind previewKind = PreviewKind.deserialize(in.readInt()); + return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, pendingRepair, previewKind); } public long serializedSize(StreamInitMessage message, int version) @@ -148,6 +153,7 @@ public class StreamInitMessage { size += UUIDSerializer.serializer.serializedSize(message.pendingRepair, MessagingService.current_version); } + size += TypeSizes.sizeof(message.previewKind.getSerializationVal()); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/src/java/org/apache/cassandra/tools/nodetool/Repair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java index 48f929f..317a677 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java @@ -32,6 +32,7 @@ import java.util.Set; import com.google.common.collect.Sets; import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.tools.NodeProbe; @@ -73,6 +74,12 @@ public class Repair extends NodeToolCmd @Option(title = "full", name = {"-full", "--full"}, description = "Use -full to issue a full repair.") private boolean fullRepair = false; + @Option(title = "preview", name = {"-prv", "--preview"}, description = "Determine ranges and amount of data to be streamed, but don't actually perform repair") + private boolean preview = false; + + @Option(title = "validate", name = {"-vd", "--validate"}, description = "Checks that repaired data is in sync between nodes. Out of sync repaired data indicates a full repair should be run.") + private boolean validate = false; + @Option(title = "job_threads", name = {"-j", "--job-threads"}, description = "Number of threads to run repair jobs. " + "Usually this means number of CFs to repair concurrently. " + "WARNING: increasing this puts more load on repairing nodes, so be careful. (default: 1, max: 4)") @@ -84,6 +91,26 @@ public class Repair extends NodeToolCmd @Option(title = "pull_repair", name = {"-pl", "--pull"}, description = "Use --pull to perform a one way repair where data is only streamed from a remote node to this node.") private boolean pullRepair = false; + private PreviewKind getPreviewKind() + { + if (validate) + { + return PreviewKind.REPAIRED; + } + else if (preview && fullRepair) + { + return PreviewKind.ALL; + } + else if (preview) + { + return PreviewKind.UNREPAIRED; + } + else + { + return PreviewKind.NONE; + } + } + @Override public void execute(NodeProbe probe) { @@ -112,6 +139,8 @@ public class Repair extends NodeToolCmd options.put(RepairOption.TRACE_KEY, Boolean.toString(trace)); options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ",")); options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair)); + options.put(RepairOption.PREVIEW, getPreviewKind().toString()); + if (!startToken.isEmpty() || !endToken.isEmpty()) { options.put(RepairOption.RANGES_KEY, startToken + ":" + endToken); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/gms.EndpointState.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/gms.EndpointState.bin b/test/data/serialization/4.0/gms.EndpointState.bin new file mode 100644 index 0000000..fb7d168 Binary files /dev/null and b/test/data/serialization/4.0/gms.EndpointState.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/gms.Gossip.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/gms.Gossip.bin b/test/data/serialization/4.0/gms.Gossip.bin new file mode 100644 index 0000000..af5ac57 Binary files /dev/null and b/test/data/serialization/4.0/gms.Gossip.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.SyncComplete.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/service.SyncComplete.bin b/test/data/serialization/4.0/service.SyncComplete.bin new file mode 100644 index 0000000..ba84349 Binary files /dev/null and b/test/data/serialization/4.0/service.SyncComplete.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.SyncRequest.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/service.SyncRequest.bin b/test/data/serialization/4.0/service.SyncRequest.bin new file mode 100644 index 0000000..6d688a4 Binary files /dev/null and b/test/data/serialization/4.0/service.SyncRequest.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.ValidationComplete.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/service.ValidationComplete.bin b/test/data/serialization/4.0/service.ValidationComplete.bin new file mode 100644 index 0000000..7433d64 Binary files /dev/null and b/test/data/serialization/4.0/service.ValidationComplete.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/service.ValidationRequest.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/service.ValidationRequest.bin b/test/data/serialization/4.0/service.ValidationRequest.bin new file mode 100644 index 0000000..a00763b Binary files /dev/null and b/test/data/serialization/4.0/service.ValidationRequest.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/data/serialization/4.0/utils.EstimatedHistogram.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/4.0/utils.EstimatedHistogram.bin b/test/data/serialization/4.0/utils.EstimatedHistogram.bin new file mode 100644 index 0000000..e878eda Binary files /dev/null and b/test/data/serialization/4.0/utils.EstimatedHistogram.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/AbstractSerializationsTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java index 04cb083..3611f0e 100644 --- a/test/unit/org/apache/cassandra/AbstractSerializationsTester.java +++ b/test/unit/org/apache/cassandra/AbstractSerializationsTester.java @@ -36,10 +36,11 @@ import java.util.Map; public class AbstractSerializationsTester { - protected static final String CUR_VER = System.getProperty("cassandra.version", "3.0"); + protected static final String CUR_VER = System.getProperty("cassandra.version", "4.0"); protected static final Map<String, Integer> VERSION_MAP = new HashMap<String, Integer> () {{ put("3.0", MessagingService.VERSION_30); + put("4.0", MessagingService.VERSION_40); }}; protected static final boolean EXECUTE_WRITES = Boolean.getBoolean("cassandra.test-serialization-writes"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java index 0ee85c6..b9e3c17 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionManagerGetSSTablesForValidationTest.java @@ -41,6 +41,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.Validator; import org.apache.cassandra.schema.KeyspaceParams; @@ -106,7 +107,8 @@ public class CompactionManagerGetSSTablesForValidationTest Sets.newHashSet(range), incremental, incremental ? System.currentTimeMillis() : ActiveRepairService.UNREPAIRED_SSTABLE, - true); + true, + PreviewKind.NONE); desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, Collections.singleton(range)); } @@ -135,7 +137,7 @@ public class CompactionManagerGetSSTablesForValidationTest modifySSTables(); // get sstables for repair - Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), true); + Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), true, PreviewKind.NONE); Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator)); Assert.assertNotNull(sstables); Assert.assertEquals(1, sstables.size()); @@ -150,7 +152,7 @@ public class CompactionManagerGetSSTablesForValidationTest modifySSTables(); // get sstables for repair - Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false); + Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false, PreviewKind.NONE); Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator)); Assert.assertNotNull(sstables); Assert.assertEquals(2, sstables.size()); @@ -166,7 +168,7 @@ public class CompactionManagerGetSSTablesForValidationTest modifySSTables(); // get sstables for repair - Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false); + Validator validator = new Validator(desc, coordinator, FBUtilities.nowInSeconds(), false, PreviewKind.NONE); Set<SSTableReader> sstables = Sets.newHashSet(CompactionManager.instance.getSSTablesToValidate(cfs, validator)); Assert.assertNotNull(sstables); Assert.assertEquals(3, sstables.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java index 0a38cd9..360a2cd 100644 --- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java @@ -51,6 +51,7 @@ import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.notifications.SSTableAddedNotification; import org.apache.cassandra.notifications.SSTableRepairStatusChanged; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.Validator; import org.apache.cassandra.schema.CompactionParams; @@ -193,9 +194,16 @@ public class LeveledCompactionStrategyTest Range<Token> range = new Range<>(Util.token(""), Util.token("")); int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(FBUtilities.nowInSeconds()); UUID parentRepSession = UUID.randomUUID(); - ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, ActiveRepairService.UNREPAIRED_SSTABLE, true); + ActiveRepairService.instance.registerParentRepairSession(parentRepSession, + FBUtilities.getBroadcastAddress(), + Arrays.asList(cfs), + Arrays.asList(range), + false, + ActiveRepairService.UNREPAIRED_SSTABLE, + true, + PreviewKind.NONE); RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, Arrays.asList(range)); - Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore); + Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore, PreviewKind.NONE); CompactionManager.instance.submitValidation(cfs, validator).get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java index f11362f..b5f8036 100644 --- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java +++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java @@ -25,6 +25,7 @@ import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.streaming.DefaultConnectionFactory; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamEvent; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.FBUtilities; @@ -50,7 +51,7 @@ public class StreamStateStoreTest Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100")); InetAddress local = FBUtilities.getBroadcastAddress(); - StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null); + StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE); session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf")); StreamStateStore store = new StreamStateStore(); @@ -71,7 +72,7 @@ public class StreamStateStoreTest // add different range within the same keyspace Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200")); - session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null); + session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true, null, PreviewKind.NONE); session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf")); session.state(StreamSession.State.COMPLETE); store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 97bd321..53f5ab3 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -56,6 +56,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -789,7 +790,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase List<StreamSession.SSTableStreamingSections> sectionsBeforeRewrite = StreamSession.getSSTableSectionsForRanges( Collections.singleton(new Range<Token>(firstToken, firstToken)), - Collections.singleton(cfs), null); + Collections.singleton(cfs), null, PreviewKind.NONE); assertEquals(1, sectionsBeforeRewrite.size()); for (StreamSession.SSTableStreamingSections section : sectionsBeforeRewrite) section.ref.release(); @@ -804,7 +805,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase while (!done.get()) { Set<Range<Token>> range = Collections.singleton(new Range<Token>(firstToken, firstToken)); - List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), null); + List<StreamSession.SSTableStreamingSections> sections = StreamSession.getSSTableSectionsForRanges(range, Collections.singleton(cfs), null, PreviewKind.NONE); if (sections.size() != 1) failed.set(true); for (StreamSession.SSTableStreamingSections section : sections) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java index 1c508a0..d61d859 100644 --- a/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java +++ b/test/unit/org/apache/cassandra/repair/AbstractRepairTest.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.UUIDGen; @@ -85,7 +86,8 @@ public abstract class AbstractRepairTest Sets.newHashSet(RANGE1, RANGE2, RANGE3), isIncremental, repairedAt, - isGlobal); + isGlobal, + PreviewKind.NONE); return sessionId; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 75742dc..f5e9d6b 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -40,6 +40,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTrees; @@ -91,7 +92,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(ep1, tree1); TreeResponse r2 = new TreeResponse(ep2, tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, null, false); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE); task.run(); assertEquals(0, task.get().numberOfDifferences); @@ -105,9 +106,10 @@ public class LocalSyncTaskTest extends AbstractRepairTest Keyspace keyspace = Keyspace.open(KEYSPACE1); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), + ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, - ActiveRepairService.UNREPAIRED_SSTABLE, false); + ActiveRepairService.UNREPAIRED_SSTABLE, false, + PreviewKind.NONE); RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", Arrays.asList(range)); @@ -128,7 +130,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1); TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, null, false); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE); task.run(); // ensure that the changed range was recorded @@ -145,7 +147,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, NO_PENDING_REPAIR, false, PreviewKind.NONE); StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, Lists.newArrayList(RANGE1)); assertEquals(NO_PENDING_REPAIR, plan.getPendingRepair()); @@ -162,7 +164,7 @@ public class LocalSyncTaskTest extends AbstractRepairTest TreeResponse r1 = new TreeResponse(PARTICIPANT1, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); TreeResponse r2 = new TreeResponse(PARTICIPANT2, createInitialTree(desc, DatabaseDescriptor.getPartitioner())); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, desc.parentSessionId, false, PreviewKind.NONE); StreamPlan plan = task.createStreamPlan(PARTICIPANT1, PARTICIPANT2, Lists.newArrayList(RANGE1)); assertEquals(desc.parentSessionId, plan.getPendingRepair()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/RepairSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java index 0260cd0..5a4e5b1 100644 --- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -36,6 +36,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.UUIDGen; @@ -62,7 +63,10 @@ public class RepairSessionTest IPartitioner p = Murmur3Partitioner.instance; Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100))); Set<InetAddress> endpoints = Sets.newHashSet(remote); - RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, false, false, "Standard1"); + RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), + "Keyspace1", RepairParallelism.SEQUENTIAL, + endpoints, false, false, + PreviewKind.NONE, "Standard1"); // perform convict session.convict(remote, Double.MAX_VALUE); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java index 5f13e3d..f433f2e 100644 --- a/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/StreamingRepairTaskTest.java @@ -33,6 +33,7 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.utils.UUIDGen; @@ -64,8 +65,8 @@ public class StreamingRepairTaskTest extends AbstractRepairTest UUID sessionID = registerSession(cfs, true, true); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges()); - SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges()); - StreamingRepairTask task = new StreamingRepairTask(desc, request, desc.sessionId); + SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE); + StreamingRepairTask task = new StreamingRepairTask(desc, request, desc.sessionId, PreviewKind.NONE); StreamPlan plan = task.createStreamPlan(request.src, request.dst); Assert.assertFalse(plan.getFlushBeforeTransfer()); @@ -77,8 +78,8 @@ public class StreamingRepairTaskTest extends AbstractRepairTest UUID sessionID = registerSession(cfs, false, true); ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(sessionID); RepairJobDesc desc = new RepairJobDesc(sessionID, UUIDGen.getTimeUUID(), ks, tbl, prs.getRanges()); - SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges()); - StreamingRepairTask task = new StreamingRepairTask(desc, request, null); + SyncRequest request = new SyncRequest(desc, PARTICIPANT1, PARTICIPANT2, PARTICIPANT3, prs.getRanges(), PreviewKind.NONE); + StreamingRepairTask task = new StreamingRepairTask(desc, request, null, PreviewKind.NONE); StreamPlan plan = task.createStreamPlan(request.src, request.dst); Assert.assertTrue(plan.getFlushBeforeTransfer()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index bbcdbb8..b45edc1 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -50,6 +50,7 @@ import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.MerkleTree; import org.apache.cassandra.utils.MerkleTrees; @@ -98,7 +99,7 @@ public class ValidatorTest ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily); - Validator validator = new Validator(desc, remote, 0); + Validator validator = new Validator(desc, remote, 0, PreviewKind.NONE); MerkleTrees tree = new MerkleTrees(partitioner); tree.addMerkleTrees((int) Math.pow(2, 15), validator.desc.ranges); validator.prepare(cfs, tree); @@ -135,7 +136,7 @@ public class ValidatorTest InetAddress remote = InetAddress.getByName("127.0.0.2"); - Validator validator = new Validator(desc, remote, 0); + Validator validator = new Validator(desc, remote, 0, PreviewKind.NONE); validator.fail(); MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); @@ -190,10 +191,10 @@ public class ValidatorTest ActiveRepairService.instance.registerParentRepairSession(repairSessionId, FBUtilities.getBroadcastAddress(), Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE, - false); + false, PreviewKind.NONE); final CompletableFuture<MessageOut> outgoingMessageSink = registerOutgoingMessageSink(); - Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true, false); + Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), 0, true, false, PreviewKind.NONE); CompactionManager.instance.submitValidation(cfs, validator); MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, TimeUnit.SECONDS); http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java index 26168ad..367fea9 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/AbstractConsistentSessionTest.java @@ -33,6 +33,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.UUIDGen; @@ -85,7 +86,8 @@ public abstract class AbstractConsistentSessionTest Sets.newHashSet(RANGE1, RANGE2, RANGE3), true, System.currentTimeMillis(), - true); + true, + PreviewKind.NONE); return sessionId; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cfaf855/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java index 2cb6326..2126835 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/PendingAntiCompactionTest.java @@ -52,6 +52,7 @@ import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -146,7 +147,7 @@ public class PendingAntiCompactionTest // create a session so the anti compaction can fine it UUID sessionID = UUIDGen.getTimeUUID(); - ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddress.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true); + ActiveRepairService.instance.registerParentRepairSession(sessionID, InetAddress.getLocalHost(), Lists.newArrayList(cfs), ranges, true, 1, true, PreviewKind.NONE); PendingAntiCompaction pac; ExecutorService executor = Executors.newSingleThreadExecutor();