Repository: cassandra Updated Branches: refs/heads/trunk 30820eacb -> c794d2bed
Skip building views during base table streams on range movements patch by Benjamin Roth; reviewed by Paulo Motta for CASSANDRA-13065 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c794d2be Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c794d2be Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c794d2be Branch: refs/heads/trunk Commit: c794d2bed7ca1d10e13c4da08a3d45f5c755c1d8 Parents: 30820ea Author: brstgt <[email protected]> Authored: Tue Feb 28 13:36:16 2017 +0100 Committer: Paulo Motta <[email protected]> Committed: Thu Apr 6 18:18:55 2017 -0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 5 +- .../org/apache/cassandra/dht/BootStrapper.java | 2 +- .../org/apache/cassandra/dht/RangeStreamer.java | 7 +- .../apache/cassandra/dht/StreamStateStore.java | 2 +- .../cassandra/io/sstable/SSTableLoader.java | 2 +- .../net/IncomingStreamingConnection.java | 2 +- .../apache/cassandra/repair/LocalSyncTask.java | 9 +- .../cassandra/repair/StreamingRepairTask.java | 3 +- .../cassandra/service/StorageService.java | 8 +- .../cassandra/streaming/ConnectionHandler.java | 2 +- .../apache/cassandra/streaming/StreamEvent.java | 4 +- .../cassandra/streaming/StreamOperation.java | 69 +++++++++++++++ .../apache/cassandra/streaming/StreamPlan.java | 18 ++-- .../cassandra/streaming/StreamReceiveTask.java | 93 ++++++++++++-------- .../cassandra/streaming/StreamResultFuture.java | 34 +++---- .../cassandra/streaming/StreamSession.java | 4 +- .../apache/cassandra/streaming/StreamState.java | 6 +- .../management/StreamStateCompositeData.java | 7 +- .../streaming/messages/StreamInitMessage.java | 13 +-- .../cassandra/tools/nodetool/NetStats.java | 2 +- .../apache/cassandra/dht/BootStrapperTest.java | 3 +- .../cassandra/io/sstable/LegacySSTableTest.java | 5 +- .../streaming/StreamOperationTest.java | 47 ++++++++++ .../streaming/StreamTransferTaskTest.java | 2 +- .../streaming/StreamingTransferTest.java | 12 +-- 26 files changed, 254 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0e3da3f..b7b2cc9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Skip building views during base table streams on range movements (CASSANDRA-13065) * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197) * Remove deprecated repair JMX APIs (CASSANDRA-11530) * Fix version check to enable streaming keep-alive (CASSANDRA-12929) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 058b378..0d64a94 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -59,6 +59,7 @@ import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PaxosState; +import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.*; @@ -1196,7 +1197,7 @@ public final class SystemKeyspace availableRanges.truncateBlocking(); } - public static synchronized void updateTransferredRanges(String description, + public static synchronized void updateTransferredRanges(StreamOperation streamOperation, InetAddress peer, String keyspace, Collection<Range<Token>> streamedRanges) @@ -1207,7 +1208,7 @@ public final class SystemKeyspace { rangesToUpdate.add(rangeToBytes(range)); } - executeInternal(format(cql, TRANSFERRED_RANGES), rangesToUpdate, description, peer, keyspace); + executeInternal(format(cql, TRANSFERRED_RANGES), rangesToUpdate, streamOperation.getDescription(), peer, keyspace); } public static synchronized Map<InetAddress, Set<Range<Token>>> getTransferredRanges(String description, String keyspace, IPartitioner partitioner) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index 9235844..a25f867 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -73,7 +73,7 @@ public class BootStrapper extends ProgressEventNotifierSupport RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, address, - "Bootstrap", + StreamOperation.BOOTSTRAP, useStrictConsistency, DatabaseDescriptor.getEndpointSnitch(), stateStore, http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 4d7c903..89a96cd 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -41,6 +41,7 @@ import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamResultFuture; +import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.utils.FBUtilities; /** @@ -144,7 +145,7 @@ public class RangeStreamer public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddress address, - String description, + StreamOperation streamOperation, boolean useStrictConsistency, IEndpointSnitch snitch, StreamStateStore stateStore, @@ -154,8 +155,8 @@ public class RangeStreamer this.metadata = metadata; this.tokens = tokens; this.address = address; - this.description = description; - this.streamPlan = new StreamPlan(description, ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost, + this.description = streamOperation.getDescription(); + this.streamPlan = new StreamPlan(streamOperation, ActiveRepairService.UNREPAIRED_SSTABLE, connectionsPerHost, true, false, connectSequentially, null); this.useStrictConsistency = useStrictConsistency; this.snitch = snitch; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/dht/StreamStateStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java b/src/java/org/apache/cassandra/dht/StreamStateStore.java index 47b3072..e3ea838 100644 --- a/src/java/org/apache/cassandra/dht/StreamStateStore.java +++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java @@ -69,7 +69,7 @@ public class StreamStateStore implements StreamEventHandler Set<String> keyspaces = se.transferredRangesPerKeyspace.keySet(); for (String keyspace : keyspaces) { - SystemKeyspace.updateTransferredRanges(se.description, se.peer, keyspace, se.transferredRangesPerKeyspace.get(keyspace)); + SystemKeyspace.updateTransferredRanges(se.streamOperation, se.peer, keyspace, se.transferredRangesPerKeyspace.get(keyspace)); } for (StreamRequest request : se.requests) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 7e79fa9..759fa0f 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -159,7 +159,7 @@ public class SSTableLoader implements StreamEventHandler client.init(keyspace); outputHandler.output("Established connection to initial hosts"); - StreamPlan plan = new StreamPlan("Bulk Load", 0, connectionsPerHost, false, false, false, null).connectionFactory(client.getConnectionFactory()); + StreamPlan plan = new StreamPlan(StreamOperation.BULK_LOAD, 0, connectionsPerHost, false, false, false, null).connectionFactory(client.getConnectionFactory()); Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap(); openSSTables(endpointToRanges); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index 19bf3d4..eee0042 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -73,7 +73,7 @@ public class IncomingStreamingConnection extends Thread implements Closeable // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing. // Note: we cannot use the same socket for incoming and outgoing streams because we want to // parallelize said streams and the socket is blocking, so we might deadlock. - StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.description, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental, init.pendingRepair); + StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.isIncremental, init.pendingRepair); } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index 56411d9..3dd6532 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -33,6 +33,7 @@ import org.apache.cassandra.streaming.StreamEvent; import org.apache.cassandra.streaming.StreamEventHandler; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.tracing.TraceState; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; @@ -79,10 +80,10 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler isIncremental = prs.isIncremental; } Tracing.traceRepair(message); - StreamPlan plan = new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, pendingRepair).listeners(this) - .flushBeforeTransfer(true) - // request ranges from the remote node - .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); + StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, repairedAt, 1, false, isIncremental, false, pendingRepair).listeners(this) + .flushBeforeTransfer(true) + // request ranges from the remote node + .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily); if (!pullRepair) { // send ranges to the remote node if we are not performing a pull repair http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index 6bce1fa..c5f3c95 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -32,6 +32,7 @@ import org.apache.cassandra.streaming.StreamEvent; import org.apache.cassandra.streaming.StreamEventHandler; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.streaming.StreamOperation; /** * StreamingRepairTask performs data streaming between two remote replica which neither is not repair coordinator. @@ -71,7 +72,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler @VisibleForTesting StreamPlan createStreamPlan(InetAddress dest, InetAddress preferred, boolean isIncremental) { - return new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false, isConsistent ? desc.parentSessionId : null) + return new StreamPlan(StreamOperation.REPAIR, repairedAt, 1, false, isIncremental, false, isConsistent ? desc.parentSessionId : null) .listeners(this) .flushBeforeTransfer(!isIncremental) // sstables are isolated at the beginning of an incremental repair session, so flushing isn't neccessary .requestRanges(dest, preferred, desc.keyspace, request.ranges, desc.columnFamily) // request ranges from the remote node http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index abbc001..735c7cf 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1124,7 +1124,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE RangeStreamer streamer = new RangeStreamer(tokenMetadata, null, FBUtilities.getBroadcastAddress(), - "Rebuild", + StreamOperation.REBUILD, useStrictConsistency && !replacing, DatabaseDescriptor.getEndpointSnitch(), streamStateStore, @@ -2560,7 +2560,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } - StreamPlan stream = new StreamPlan("Restore replica count"); + StreamPlan stream = new StreamPlan(StreamOperation.RESTORE_REPLICA_COUNT); for (String keyspaceName : rangesToFetch.keySet()) { for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangesToFetch.get(keyspaceName)) @@ -3876,7 +3876,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private class RangeRelocator { - private final StreamPlan streamPlan = new StreamPlan("Relocation"); + private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION); private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames) { @@ -4690,7 +4690,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE sessionsToStreamByKeyspace.put(keyspace, rangesPerEndpoint); } - StreamPlan streamPlan = new StreamPlan("Unbootstrap"); + StreamPlan streamPlan = new StreamPlan(StreamOperation.DECOMMISSION); // Vinculate StreamStateStore to current StreamPlan to update transferred ranges per StreamSession streamPlan.listeners(streamStateStore); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java index 10c5827..91f1249 100644 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java @@ -195,7 +195,7 @@ public class ConnectionHandler FBUtilities.getBroadcastAddress(), session.sessionIndex(), session.planId(), - session.description(), + session.streamOperation(), !isOutgoingHandler, session.keepSSTableLevel(), session.isIncremental(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java index 49172fb..6ea2814 100644 --- a/src/java/org/apache/cassandra/streaming/StreamEvent.java +++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java @@ -52,7 +52,7 @@ public abstract class StreamEvent public final boolean success; public final int sessionIndex; public final Set<StreamRequest> requests; - public final String description; + public final StreamOperation streamOperation; public final Map<String, Set<Range<Token>>> transferredRangesPerKeyspace; public SessionCompleteEvent(StreamSession session) @@ -62,7 +62,7 @@ public abstract class StreamEvent this.success = session.isSuccess(); this.sessionIndex = session.sessionIndex(); this.requests = ImmutableSet.copyOf(session.requests); - this.description = session.description(); + this.streamOperation = session.streamOperation(); this.transferredRangesPerKeyspace = Collections.unmodifiableMap(session.transferredRangesPerKeyspace); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamOperation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamOperation.java b/src/java/org/apache/cassandra/streaming/StreamOperation.java new file mode 100644 index 0000000..8151b47 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/StreamOperation.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +public enum StreamOperation +{ + OTHER("Other"), // Fallback to avoid null types when deserializing from string + RESTORE_REPLICA_COUNT("Restore replica count", false), // Handles removeNode + DECOMMISSION("Unbootstrap", false), + RELOCATION("Relocation", false), + BOOTSTRAP("Bootstrap", false), + REBUILD("Rebuild", false), + BULK_LOAD("Bulk Load"), + REPAIR("Repair"); + + private final String description; + private final boolean requiresViewBuild; + + + StreamOperation(String description) { + this(description, true); + } + + /** + * @param description The operation description + * @param requiresViewBuild Whether this operation requires views to be updated if it involves a base table + */ + StreamOperation(String description, boolean requiresViewBuild) { + this.description = description; + this.requiresViewBuild = requiresViewBuild; + } + + public static StreamOperation fromString(String text) { + for (StreamOperation b : StreamOperation.values()) { + if (b.description.equalsIgnoreCase(text)) { + return b; + } + } + + return OTHER; + } + + public String getDescription() { + return description; + } + + /** + * Wether this operation requires views to be updated + */ + public boolean requiresViewBuild() + { + return this.requiresViewBuild; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 5a2ce77..faaac0e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -34,7 +34,7 @@ public class StreamPlan { public static final String[] EMPTY_COLUMN_FAMILIES = new String[0]; private final UUID planId = UUIDGen.getTimeUUID(); - private final String description; + private final StreamOperation streamOperation; private final List<StreamEventHandler> handlers = new ArrayList<>(); private final long repairedAt; private final StreamCoordinator coordinator; @@ -44,22 +44,22 @@ public class StreamPlan /** * Start building stream plan. * - * @param description Stream type that describes this StreamPlan + * @param streamOperation Stream streamOperation that describes this StreamPlan */ - public StreamPlan(String description) + public StreamPlan(StreamOperation streamOperation) { - this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false, null); + this(streamOperation, ActiveRepairService.UNREPAIRED_SSTABLE, 1, false, false, false, null); } - public StreamPlan(String description, boolean keepSSTableLevels, boolean connectSequentially) + public StreamPlan(StreamOperation streamOperation, boolean keepSSTableLevels, boolean connectSequentially) { - this(description, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially, null); + this(streamOperation, ActiveRepairService.UNREPAIRED_SSTABLE, 1, keepSSTableLevels, false, connectSequentially, null); } - public StreamPlan(String description, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, + public StreamPlan(StreamOperation streamOperation, long repairedAt, int connectionsPerHost, boolean keepSSTableLevels, boolean isIncremental, boolean connectSequentially, UUID pendingRepair) { - this.description = description; + this.streamOperation = streamOperation; this.repairedAt = repairedAt; this.coordinator = new StreamCoordinator(connectionsPerHost, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), connectSequentially, pendingRepair); @@ -187,7 +187,7 @@ public class StreamPlan */ public StreamResultFuture execute() { - return StreamResultFuture.init(planId, description, handlers, coordinator); + return StreamResultFuture.init(planId, streamOperation, handlers, coordinator); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index d0c4d50..b7e475a 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -32,6 +32,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; @@ -147,11 +148,61 @@ public class StreamReceiveTask extends StreamTask this.task = task; } + /* + * We have a special path for views and for CDC. + * + * For views, since the view requires cleaning up any pre-existing state, we must put all partitions + * through the same write path as normal mutations. This also ensures any 2is are also updated. + * + * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they + * can be archived by the CDC process on discard. + */ + private boolean requiresWritePath(ColumnFamilyStore cfs) { + return hasCDC(cfs) || (task.session.streamOperation().requiresViewBuild() && hasViews(cfs)); + } + + private boolean hasViews(ColumnFamilyStore cfs) + { + return !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName())); + } + + private boolean hasCDC(ColumnFamilyStore cfs) + { + return cfs.metadata().params.cdc; + } + + Mutation createMutation(ColumnFamilyStore cfs, UnfilteredRowIterator rowIterator) + { + return new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata()))); + } + + private void sendThroughWritePath(ColumnFamilyStore cfs, Collection<SSTableReader> readers) { + boolean hasCdc = hasCDC(cfs); + for (SSTableReader reader : readers) + { + Keyspace ks = Keyspace.open(reader.getKeyspaceName()); + try (ISSTableScanner scanner = reader.getScanner()) + { + while (scanner.hasNext()) + { + try (UnfilteredRowIterator rowIterator = scanner.next()) + { + // MV *can* be applied unsafe if there's no CDC on the CFS as we flush + // before transaction is done. + // + // If the CFS has CDC, however, these updates need to be written to the CommitLog + // so they get archived into the cdc_raw folder + ks.apply(createMutation(cfs, rowIterator), hasCdc, true, false); + } + } + } + } + } + public void run() { - boolean hasViews = false; - boolean hasCDC = false; ColumnFamilyStore cfs = null; + boolean requiresWritePath = false; try { cfs = ColumnFamilyStore.getIfExists(task.tableId); @@ -163,45 +214,15 @@ public class StreamReceiveTask extends StreamTask task.session.taskCompleted(task); return; } - hasViews = !Iterables.isEmpty(View.findAll(cfs.metadata.keyspace, cfs.getTableName())); - hasCDC = cfs.metadata().params.cdc; + requiresWritePath = requiresWritePath(cfs); Collection<SSTableReader> readers = task.sstables; try (Refs<SSTableReader> refs = Refs.ref(readers)) { - /* - * We have a special path for views and for CDC. - * - * For views, since the view requires cleaning up any pre-existing state, we must put all partitions - * through the same write path as normal mutations. This also ensures any 2is are also updated. - * - * For CDC-enabled tables, we want to ensure that the mutations are run through the CommitLog so they - * can be archived by the CDC process on discard. - */ - if (hasViews || hasCDC) + if (requiresWritePath) { - for (SSTableReader reader : readers) - { - Keyspace ks = Keyspace.open(reader.getKeyspaceName()); - try (ISSTableScanner scanner = reader.getScanner()) - { - while (scanner.hasNext()) - { - try (UnfilteredRowIterator rowIterator = scanner.next()) - { - Mutation m = new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata()))); - - // MV *can* be applied unsafe if there's no CDC on the CFS as we flush below - // before transaction is done. - // - // If the CFS has CDC, however, these updates need to be written to the CommitLog - // so they get archived into the cdc_raw folder - ks.apply(m, hasCDC, true, false); - } - } - } - } + sendThroughWritePath(cfs, readers); } else { @@ -249,7 +270,7 @@ public class StreamReceiveTask extends StreamTask { // We don't keep the streamed sstables since we've applied them manually so we abort the txn and delete // the streamed sstables. - if (hasViews || hasCDC) + if (requiresWritePath) { if (cfs != null) cfs.forceBlockingFlush(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamResultFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java index 6d0c03b..4890b63 100644 --- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java +++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java @@ -48,22 +48,22 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> private static final Logger logger = LoggerFactory.getLogger(StreamResultFuture.class); public final UUID planId; - public final String description; + public final StreamOperation streamOperation; private final StreamCoordinator coordinator; private final Collection<StreamEventHandler> eventListeners = new ConcurrentLinkedQueue<>(); /** - * Create new StreamResult of given {@code planId} and type. + * Create new StreamResult of given {@code planId} and streamOperation. * * Constructor is package private. You need to use {@link StreamPlan#execute()} to get the instance. * * @param planId Stream plan ID - * @param description Stream description + * @param streamOperation Stream streamOperation */ - private StreamResultFuture(UUID planId, String description, StreamCoordinator coordinator) + private StreamResultFuture(UUID planId, StreamOperation streamOperation, StreamCoordinator coordinator) { this.planId = planId; - this.description = description; + this.streamOperation = streamOperation; this.coordinator = coordinator; // if there is no session to listen to, we immediately set result for returning @@ -71,22 +71,22 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> set(getCurrentState()); } - private StreamResultFuture(UUID planId, String description, boolean keepSSTableLevels, boolean isIncremental, UUID pendingRepair) + private StreamResultFuture(UUID planId, StreamOperation streamOperation, boolean keepSSTableLevels, boolean isIncremental, UUID pendingRepair) { - this(planId, description, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), false, pendingRepair)); + this(planId, streamOperation, new StreamCoordinator(0, keepSSTableLevels, isIncremental, new DefaultConnectionFactory(), false, pendingRepair)); } - static StreamResultFuture init(UUID planId, String description, Collection<StreamEventHandler> listeners, + static StreamResultFuture init(UUID planId, StreamOperation streamOperation, Collection<StreamEventHandler> listeners, StreamCoordinator coordinator) { - StreamResultFuture future = createAndRegister(planId, description, coordinator); + StreamResultFuture future = createAndRegister(planId, streamOperation, coordinator); if (listeners != null) { for (StreamEventHandler listener : listeners) future.addEventListener(listener); } - logger.info("[Stream #{}] Executing streaming plan for {}", planId, description); + logger.info("[Stream #{}] Executing streaming plan for {}", planId, streamOperation.getDescription()); // Initialize and start all sessions for (final StreamSession session : coordinator.getAllStreamSessions()) @@ -101,7 +101,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> public static synchronized StreamResultFuture initReceivingSide(int sessionIndex, UUID planId, - String description, + StreamOperation streamOperation, InetAddress from, IncomingStreamingConnection connection, boolean isForOutgoing, @@ -113,20 +113,20 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> StreamResultFuture future = StreamManager.instance.getReceivingStream(planId); if (future == null) { - logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, description); + logger.info("[Stream #{} ID#{}] Creating new streaming plan for {}", planId, sessionIndex, streamOperation.getDescription()); // The main reason we create a StreamResultFuture on the receiving side is for JMX exposure. - future = new StreamResultFuture(planId, description, keepSSTableLevel, isIncremental, pendingRepair); + future = new StreamResultFuture(planId, streamOperation, keepSSTableLevel, isIncremental, pendingRepair); StreamManager.instance.registerReceiving(future); } future.attachConnection(from, sessionIndex, connection, isForOutgoing, version); - logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, description); + logger.info("[Stream #{}, ID#{}] Received streaming plan for {}", planId, sessionIndex, streamOperation.getDescription()); return future; } - private static StreamResultFuture createAndRegister(UUID planId, String description, StreamCoordinator coordinator) + private static StreamResultFuture createAndRegister(UUID planId, StreamOperation streamOperation, StreamCoordinator coordinator) { - StreamResultFuture future = new StreamResultFuture(planId, description, coordinator); + StreamResultFuture future = new StreamResultFuture(planId, streamOperation, coordinator); StreamManager.instance.register(future); return future; } @@ -149,7 +149,7 @@ public final class StreamResultFuture extends AbstractFuture<StreamState> */ public StreamState getCurrentState() { - return new StreamState(planId, description, coordinator.getAllSessionInfo()); + return new StreamState(planId, streamOperation, coordinator.getAllSessionInfo()); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index bfae0bf..62fa317 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -212,9 +212,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber return index; } - public String description() + public StreamOperation streamOperation() { - return streamResult == null ? null : streamResult.description; + return streamResult == null ? null : streamResult.streamOperation; } public boolean keepSSTableLevel() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/StreamState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamState.java b/src/java/org/apache/cassandra/streaming/StreamState.java index db50c2a..4ee3c8d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamState.java +++ b/src/java/org/apache/cassandra/streaming/StreamState.java @@ -30,14 +30,14 @@ import com.google.common.collect.Iterables; public class StreamState implements Serializable { public final UUID planId; - public final String description; + public final StreamOperation streamOperation; public final Set<SessionInfo> sessions; - public StreamState(UUID planId, String description, Set<SessionInfo> sessions) + public StreamState(UUID planId, StreamOperation streamOperation, Set<SessionInfo> sessions) { this.planId = planId; - this.description = description; this.sessions = sessions; + this.streamOperation = streamOperation; } public boolean hasFailedSession() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java index e25ab1a..de88762 100644 --- a/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java +++ b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java @@ -28,6 +28,7 @@ import com.google.common.collect.Sets; import org.apache.cassandra.streaming.SessionInfo; import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.streaming.StreamOperation; /** */ @@ -73,7 +74,7 @@ public class StreamStateCompositeData { Map<String, Object> valueMap = new HashMap<>(); valueMap.put(ITEM_NAMES[0], streamState.planId.toString()); - valueMap.put(ITEM_NAMES[1], streamState.description); + valueMap.put(ITEM_NAMES[1], streamState.streamOperation.getDescription()); CompositeData[] sessions = new CompositeData[streamState.sessions.size()]; Lists.newArrayList(Iterables.transform(streamState.sessions, new Function<SessionInfo, CompositeData>() @@ -121,7 +122,7 @@ public class StreamStateCompositeData assert cd.getCompositeType().equals(COMPOSITE_TYPE); Object[] values = cd.getAll(ITEM_NAMES); UUID planId = UUID.fromString((String) values[0]); - String description = (String) values[1]; + String typeString = (String) values[1]; Set<SessionInfo> sessions = Sets.newHashSet(Iterables.transform(Arrays.asList((CompositeData[]) values[2]), new Function<CompositeData, SessionInfo>() { @@ -130,6 +131,6 @@ public class StreamStateCompositeData return SessionInfoCompositeData.fromCompositeData(input); } })); - return new StreamState(planId, description, sessions); + return new StreamState(planId, StreamOperation.fromString(typeString), sessions); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java index 3b4b512..59f28e0 100644 --- a/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java +++ b/src/java/org/apache/cassandra/streaming/messages/StreamInitMessage.java @@ -30,6 +30,7 @@ import org.apache.cassandra.io.util.DataOutputBufferFixed; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.utils.UUIDSerializer; /** @@ -43,7 +44,7 @@ public class StreamInitMessage public final InetAddress from; public final int sessionIndex; public final UUID planId; - public final String description; + public final StreamOperation streamOperation; // true if this init message is to connect for outgoing message on receiving side public final boolean isForOutgoing; @@ -51,12 +52,12 @@ public class StreamInitMessage public final boolean isIncremental; public final UUID pendingRepair; - public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, String description, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair) + public StreamInitMessage(InetAddress from, int sessionIndex, UUID planId, StreamOperation streamOperation, boolean isForOutgoing, boolean keepSSTableLevel, boolean isIncremental, UUID pendingRepair) { this.from = from; this.sessionIndex = sessionIndex; this.planId = planId; - this.description = description; + this.streamOperation = streamOperation; this.isForOutgoing = isForOutgoing; this.keepSSTableLevel = keepSSTableLevel; this.isIncremental = isIncremental; @@ -112,7 +113,7 @@ public class StreamInitMessage CompactEndpointSerializationHelper.serialize(message.from, out); out.writeInt(message.sessionIndex); UUIDSerializer.serializer.serialize(message.planId, out, MessagingService.current_version); - out.writeUTF(message.description); + out.writeUTF(message.streamOperation.getDescription()); out.writeBoolean(message.isForOutgoing); out.writeBoolean(message.keepSSTableLevel); out.writeBoolean(message.isIncremental); @@ -135,7 +136,7 @@ public class StreamInitMessage boolean isIncremental = in.readBoolean(); UUID pendingRepair = in.readBoolean() ? UUIDSerializer.serializer.deserialize(in, version) : null; - return new StreamInitMessage(from, sessionIndex, planId, description, sentByInitiator, keepSSTableLevel, isIncremental, pendingRepair); + return new StreamInitMessage(from, sessionIndex, planId, StreamOperation.fromString(description), sentByInitiator, keepSSTableLevel, isIncremental, pendingRepair); } public long serializedSize(StreamInitMessage message, int version) @@ -143,7 +144,7 @@ public class StreamInitMessage long size = CompactEndpointSerializationHelper.serializedSize(message.from); size += TypeSizes.sizeof(message.sessionIndex); size += UUIDSerializer.serializer.serializedSize(message.planId, MessagingService.current_version); - size += TypeSizes.sizeof(message.description); + size += TypeSizes.sizeof(message.streamOperation.getDescription()); size += TypeSizes.sizeof(message.isForOutgoing); size += TypeSizes.sizeof(message.keepSSTableLevel); size += TypeSizes.sizeof(message.isIncremental); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/src/java/org/apache/cassandra/tools/nodetool/NetStats.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java index c171a3e..1b7a63b 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java +++ b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java @@ -47,7 +47,7 @@ public class NetStats extends NodeToolCmd System.out.println("Not sending any streams."); for (StreamState status : statuses) { - System.out.printf("%s %s%n", status.description, status.planId.toString()); + System.out.printf("%s %s%n", status.streamOperation.getDescription(), status.planId.toString()); for (SessionInfo info : status.sessions) { System.out.printf(" %s", info.peer.toString()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index 9481201..a1054bb 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -52,6 +52,7 @@ import org.apache.cassandra.locator.RackInferringSnitch; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.utils.FBUtilities; @RunWith(OrderedJUnit4ClassRunner.class) @@ -98,7 +99,7 @@ public class BootStrapperTest InetAddress myEndpoint = InetAddress.getByName("127.0.0.1"); assertEquals(numOldNodes, tmd.sortedTokens().size()); - RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, "Bootstrap", true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false, 1); + RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), false, 1); IFailureDetector mockFailureDetector = new IFailureDetector() { public boolean isAlive(InetAddress ep) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index 3a963b1..944e320 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -52,6 +52,7 @@ import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -194,8 +195,8 @@ public class LegacySSTableTest details.add(new StreamSession.SSTableStreamingSections(sstable.ref(), sstable.getPositionsForRanges(ranges), sstable.estimatedKeysForRanges(ranges), sstable.getSSTableMetadata().repairedAt)); - new StreamPlan("LegacyStreamingTest").transferFiles(FBUtilities.getBroadcastAddress(), details) - .execute().get(); + new StreamPlan(StreamOperation.OTHER).transferFiles(FBUtilities.getBroadcastAddress(), details) + .execute().get(); } private static void truncateLegacyTables(String legacyVersion) throws Exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/test/unit/org/apache/cassandra/streaming/StreamOperationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamOperationTest.java b/test/unit/org/apache/cassandra/streaming/StreamOperationTest.java new file mode 100644 index 0000000..2cc216e --- /dev/null +++ b/test/unit/org/apache/cassandra/streaming/StreamOperationTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.streaming; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class StreamOperationTest +{ + @Test + public void testSerialization() + { + // Unknown descriptions fall back to OTHER + assertEquals(StreamOperation.OTHER, StreamOperation.fromString("Foobar")); + assertEquals(StreamOperation.OTHER, StreamOperation.fromString("Other")); + assertEquals(StreamOperation.RESTORE_REPLICA_COUNT, StreamOperation.fromString("Restore replica count")); + assertEquals(StreamOperation.DECOMMISSION, StreamOperation.fromString("Unbootstrap")); + assertEquals(StreamOperation.RELOCATION, StreamOperation.fromString("Relocation")); + assertEquals(StreamOperation.BOOTSTRAP, StreamOperation.fromString("Bootstrap")); + assertEquals(StreamOperation.REBUILD, StreamOperation.fromString("Rebuild")); + assertEquals(StreamOperation.BULK_LOAD, StreamOperation.fromString("Bulk Load")); + assertEquals(StreamOperation.REPAIR, StreamOperation.fromString("Repair")); + // Test case insensivity + assertEquals(StreamOperation.REPAIR, StreamOperation.fromString("rEpair")); + + // Test description + assertEquals("Repair", StreamOperation.REPAIR.getDescription()); + assertEquals("Restore replica count", StreamOperation.RESTORE_REPLICA_COUNT.getDescription()); + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java index 682e039..ce8d2dd 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java @@ -121,7 +121,7 @@ public class StreamTransferTaskTest { InetAddress peer = FBUtilities.getBroadcastAddress(); StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, false, null, false, null); - StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), "", Collections.<StreamEventHandler>emptyList(), streamCoordinator); + StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator); StreamSession session = new StreamSession(peer, peer, null, 0, true, false, null); session.init(future); ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c794d2be/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index 1fa71f5..36329f4 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -114,14 +114,14 @@ public class StreamingTransferTest @Test public void testEmptyStreamPlan() throws Exception { - StreamResultFuture futureResult = new StreamPlan("StreamingTransferTest").execute(); + StreamResultFuture futureResult = new StreamPlan(StreamOperation.OTHER).execute(); final UUID planId = futureResult.planId; Futures.addCallback(futureResult, new FutureCallback<StreamState>() { public void onSuccess(StreamState result) { assert planId.equals(result.planId); - assert result.description.equals("StreamingTransferTest"); + assert result.streamOperation == StreamOperation.OTHER; assert result.sessions.isEmpty(); } @@ -143,14 +143,14 @@ public class StreamingTransferTest ranges.add(new Range<>(p.getMinimumToken(), p.getToken(ByteBufferUtil.bytes("key1")))); ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); - StreamResultFuture futureResult = new StreamPlan("StreamingTransferTest") + StreamResultFuture futureResult = new StreamPlan(StreamOperation.OTHER) .requestRanges(LOCAL, LOCAL, KEYSPACE2, ranges) .execute(); UUID planId = futureResult.planId; StreamState result = futureResult.get(); assert planId.equals(result.planId); - assert result.description.equals("StreamingTransferTest"); + assert result.streamOperation == StreamOperation.OTHER; // we should have completed session with empty transfer assert result.sessions.size() == 1; @@ -238,7 +238,7 @@ public class StreamingTransferTest List<Range<Token>> ranges = new ArrayList<>(); // wrapped range ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0")))); - StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getTableName()); + StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getTableName()); streamPlan.execute().get(); verifyConnectionsAreClosed(); @@ -256,7 +256,7 @@ public class StreamingTransferTest private void transfer(SSTableReader sstable, List<Range<Token>> ranges) throws Exception { - StreamPlan streamPlan = new StreamPlan("StreamingTransferTest").transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))); + StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferFiles(LOCAL, makeStreamingDetails(ranges, Refs.tryRef(Arrays.asList(sstable)))); streamPlan.execute().get(); verifyConnectionsAreClosed();
