This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new e87a1e0 Expose streaming as a vtable
e87a1e0 is described below
commit e87a1e0c0a19c64ed2edc2d340c0f8af16776e2c
Author: David Capwell <[email protected]>
AuthorDate: Tue Mar 1 13:15:18 2022 -0800
Expose streaming as a vtable
patch by David Capwell; reviewed by Dinesh Joshi, Paulo Motta for
CASSANDRA-17390
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 28 ++
.../org/apache/cassandra/config/DurationSpec.java | 24 ++
.../cassandra/db/virtual/AbstractVirtualTable.java | 6 +
.../apache/cassandra/db/virtual/SimpleDataSet.java | 6 +-
.../db/virtual/StreamingVirtualTable.java | 109 ++++++
.../cassandra/db/virtual/SystemViewsKeyspace.java | 1 +
.../apache/cassandra/service/CassandraDaemon.java | 2 +
.../apache/cassandra/streaming/SessionInfo.java | 11 +-
.../apache/cassandra/streaming/StreamManager.java | 145 ++++++-
.../cassandra/streaming/StreamResultFuture.java | 13 +-
.../apache/cassandra/streaming/StreamingState.java | 432 +++++++++++++++++++++
.../management/StreamEventJMXNotifier.java | 2 +-
.../tools/nodetool/formatter/TableBuilder.java | 20 +
.../cassandra/distributed/impl/Instance.java | 3 +
.../test/streaming/RebuildStreamingTest.java | 96 +++++
.../distributed/util/QueryResultUtil.java | 70 +++-
test/unit/org/apache/cassandra/cql3/CQLTester.java | 17 +-
.../unit/org/apache/cassandra/db/MmapFileTest.java | 10 +
.../cassandra/db/filter/ColumnFilterTest.java | 2 +
.../db/virtual/StreamingVirtualTableTest.java | 219 +++++++++++
.../org/apache/cassandra/io/util/FileTest.java | 4 +
.../AbstractFilesystemOwnershipCheckTest.java | 9 +
.../apache/cassandra/utils/binlog/BinLogTest.java | 9 +
25 files changed, 1214 insertions(+), 27 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 5580102..97ce6c8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Expose streaming as a vtable (CASSANDRA-17390)
* Make startup checks configurable (CASSANDRA-17220)
* Add guardrail for number of partition keys on IN queries (CASSANDRA-17186)
* update Python test framework from nose to pytest (CASSANDRA-17293)
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 15226cf..964a5e1 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -768,6 +768,8 @@ public class Config
public volatile Set<String> table_properties_disallowed =
Collections.emptySet();
public volatile boolean user_timestamps_enabled = true;
public volatile boolean read_before_write_list_operations_enabled = true;
+ public volatile DurationSpec streaming_state_expires =
DurationSpec.inDays(3);
+ public volatile DataStorageSpec streaming_state_size =
DataStorageSpec.inMebibytes(40);
/** The configuration of startup checks. */
public volatile Map<StartupCheckType, Map<String, Object>> startup_checks
= new HashMap<>();
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9f88ce0..65cb3d6 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -3885,4 +3885,32 @@ public class DatabaseDescriptor
conf.force_new_prepared_statement_behaviour = value;
}
}
+
+ public static DurationSpec getStreamingStateExpires()
+ {
+ return conf.streaming_state_expires;
+ }
+
+ public static void setStreamingStateExpires(DurationSpec duration)
+ {
+ if
(!conf.streaming_state_expires.equals(Objects.requireNonNull(duration,
"duration")))
+ {
+ logger.info("Setting streaming_state_expires to {}", duration);
+ conf.streaming_state_expires = duration;
+ }
+ }
+
+ public static DataStorageSpec getStreamingStateSize()
+ {
+ return conf.streaming_state_size;
+ }
+
+ public static void setStreamingStateSize(DataStorageSpec duration)
+ {
+ if (!conf.streaming_state_size.equals(Objects.requireNonNull(duration,
"duration")))
+ {
+ logger.info("Setting streaming_state_size to {}", duration);
+ conf.streaming_state_size = duration;
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/config/DurationSpec.java
b/src/java/org/apache/cassandra/config/DurationSpec.java
index f06c247..e5edb35 100644
--- a/src/java/org/apache/cassandra/config/DurationSpec.java
+++ b/src/java/org/apache/cassandra/config/DurationSpec.java
@@ -129,6 +129,19 @@ public class DurationSpec
}
}
+ // get vs no-get prefix is not consistent in the code base, but for
classes involved with config parsing, it is
+ // imporant to be explicit about get/set as this changes how parsing is
done; this class is a data-type, so is
+ // not nested, having get/set can confuse parsing thinking this is a
nested type
+ public long quantity()
+ {
+ return quantity;
+ }
+
+ public TimeUnit unit()
+ {
+ return unit;
+ }
+
/**
* Creates a {@code DurationSpec} of the specified amount of milliseconds.
*
@@ -179,6 +192,17 @@ public class DurationSpec
}
/**
+ * Creates a {@code DurationSpec} of the specified amount of days.
+ *
+ * @param days the amount of days
+ * @return a duration
+ */
+ public static DurationSpec inDays(long days)
+ {
+ return new DurationSpec(days, DAYS);
+ }
+
+ /**
* Creates a {@code DurationSpec} of the specified amount of seconds.
Custom method for special cases.
*
* @param value which can be in the old form only presenting the quantity
or the post CASSANDRA-15234 form - a
diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
index b20ac43..cf90c42 100644
--- a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
@@ -135,6 +135,12 @@ public abstract class AbstractVirtualTable implements
VirtualTable
throw new InvalidRequestException("Truncation is not supported by
table " + metadata);
}
+ @Override
+ public String toString()
+ {
+ return metadata().toString();
+ }
+
public interface DataSet
{
boolean isEmpty();
diff --git a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
index b8cb9f5..4150783 100644
--- a/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
+++ b/src/java/org/apache/cassandra/db/virtual/SimpleDataSet.java
@@ -176,8 +176,10 @@ public class SimpleDataSet extends
AbstractVirtualTable.AbstractDataSet
private void add(String columnName, Object value)
{
ColumnMetadata column =
metadata.getColumn(ByteBufferUtil.bytes(columnName));
- if (null == column || !column.isRegular())
- throw new IllegalArgumentException();
+ if (column == null)
+ throw new IllegalArgumentException("Unknown column: " +
columnName);
+ if (!column.isRegular())
+ throw new IllegalArgumentException(String.format("Expect a
regular column %s, but got %s", columnName, column.kind));
values.put(column, value);
}
diff --git
a/src/java/org/apache/cassandra/db/virtual/StreamingVirtualTable.java
b/src/java/org/apache/cassandra/db/virtual/StreamingVirtualTable.java
new file mode 100644
index 0000000..1036f18
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/StreamingVirtualTable.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamingState;
+
+import static
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
+
+public class StreamingVirtualTable extends AbstractVirtualTable
+{
+ public StreamingVirtualTable(String keyspace)
+ {
+ super(parse("CREATE TABLE streaming (" +
+ " id uuid,\n" +
+ " follower boolean,\n" +
+ " operation text, \n" +
+ " peers frozen<list<text>>,\n" +
+ " status text,\n" +
+ " progress_percentage float,\n" +
+ " last_updated_at timestamp,\n" +
+ " duration_millis bigint,\n" +
+ " failure_cause text,\n" +
+ " success_message text,\n" +
+ "\n" +
+ StreamingState.Sessions.columns() +
+ "\n" +
+ stateColumns() +
+ "\n" +
+ "PRIMARY KEY ((id))" +
+ ")", keyspace)
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .build());
+ }
+
+ private static String stateColumns()
+ {
+ StringBuilder sb = new StringBuilder();
+ for (StreamingState.Status state : StreamingState.Status.values())
+ sb.append("
status_").append(state.name().toLowerCase()).append("_timestamp timestamp,\n");
+ return sb.toString();
+ }
+
+ @Override
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+ StreamManager.instance.getStreamingStates()
+ .forEach(s -> updateDataSet(result, s));
+ return result;
+ }
+
+ @Override
+ public DataSet data(DecoratedKey partitionKey)
+ {
+ UUID id = UUIDType.instance.compose(partitionKey.getKey());
+ SimpleDataSet result = new SimpleDataSet(metadata());
+ StreamingState state = StreamManager.instance.getStreamingState(id);
+ if (state != null)
+ updateDataSet(result, state);
+ return result;
+ }
+
+ private void updateDataSet(SimpleDataSet ds, StreamingState state)
+ {
+ ds.row(state.id());
+ ds.column("last_updated_at", new Date(state.lastUpdatedAtMillis()));
// read early to see latest state
+ ds.column("follower", state.follower());
+ ds.column("operation", state.operation().getDescription());
+ ds.column("peers",
state.peers().stream().map(Object::toString).collect(Collectors.toList()));
+ ds.column("status", state.status().name().toLowerCase());
+ ds.column("progress_percentage", round(state.progress() * 100));
+ ds.column("duration_millis", state.durationMillis());
+ ds.column("failure_cause", state.failureCause());
+ ds.column("success_message", state.successMessage());
+ for (Map.Entry<StreamingState.Status, Long> e :
state.stateTimesMillis().entrySet())
+ ds.column("status_" + e.getKey().name().toLowerCase() +
"_timestamp", new Date(e.getValue()));
+
+ state.sessions().update(ds);
+ }
+
+ static float round(float value)
+ {
+ return Math.round(value * 100) / 100;
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index 6fe189e..fc0f40a 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -45,6 +45,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
.add(new RolesCacheKeysTable(VIRTUAL_VIEWS))
.add(new CQLMetricsTable(VIRTUAL_VIEWS))
.add(new BatchMetricsTable(VIRTUAL_VIEWS))
+ .add(new StreamingVirtualTable(VIRTUAL_VIEWS))
.build());
}
}
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 82eadbc..67195d7 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -77,6 +77,7 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.security.ThreadAwareSecurityManager;
+import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JMXServerUtils;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -370,6 +371,7 @@ public class CassandraDaemon
ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SizeEstimatesRecorder.instance,
30, sizeRecorderInterval, TimeUnit.SECONDS);
ActiveRepairService.instance.start();
+ StreamManager.instance.start();
// Prepared statements
QueryProcessor.instance.preloadPreparedStatements();
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java
b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index 31cff9b..55f1e44 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -44,8 +44,8 @@ public final class SessionInfo implements Serializable
/** Current session state */
public final StreamSession.State state;
- private final Map<String, ProgressInfo> receivingFiles;
- private final Map<String, ProgressInfo> sendingFiles;
+ private final Map<String, ProgressInfo> receivingFiles = new
ConcurrentHashMap<>();
+ private final Map<String, ProgressInfo> sendingFiles = new
ConcurrentHashMap<>();
public SessionInfo(InetSocketAddress peer,
int sessionIndex,
@@ -59,11 +59,14 @@ public final class SessionInfo implements Serializable
this.connecting = connecting;
this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
- this.receivingFiles = new ConcurrentHashMap<>();
- this.sendingFiles = new ConcurrentHashMap<>();
this.state = state;
}
+ public SessionInfo(SessionInfo other)
+ {
+ this(other.peer, other.sessionIndex, other.connecting,
other.receivingSummaries, other.sendingSummaries, other.state);
+ }
+
public boolean isFailed()
{
return state == StreamSession.State.FAILED;
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java
b/src/java/org/apache/cassandra/streaming/StreamManager.java
index bec112b..86b42ce 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -17,9 +17,11 @@
*/
package org.apache.cassandra.streaming;
+import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
import javax.management.NotificationFilter;
@@ -28,12 +30,18 @@ import javax.management.openmbean.CompositeData;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
import org.apache.cassandra.streaming.management.StreamStateCompositeData;
@@ -45,6 +53,8 @@ import
org.apache.cassandra.streaming.management.StreamStateCompositeData;
*/
public class StreamManager implements StreamManagerMBean
{
+ private static final Logger logger =
LoggerFactory.getLogger(StreamManager.class);
+
public static final StreamManager instance = new StreamManager();
/**
@@ -204,6 +214,7 @@ public class StreamManager implements StreamManagerMBean
}
private final StreamEventJMXNotifier notifier = new
StreamEventJMXNotifier();
+ private final CopyOnWriteArrayList<StreamListener> listeners = new
CopyOnWriteArrayList<>();
/*
* Currently running streams. Removed after completion/failure.
@@ -213,6 +224,83 @@ public class StreamManager implements StreamManagerMBean
private final Map<UUID, StreamResultFuture> initiatorStreams = new
NonBlockingHashMap<>();
private final Map<UUID, StreamResultFuture> followerStreams = new
NonBlockingHashMap<>();
+ private final Cache<UUID, StreamingState> states;
+ private final StreamListener listener = new StreamListener()
+ {
+ @Override
+ public void onRegister(StreamResultFuture result)
+ {
+ // reason for synchronized rather than states.get is to detect
duplicates
+ // streaming shouldn't be producing duplicates as that would imply
a planId collision
+ synchronized (states)
+ {
+ StreamingState previous = states.getIfPresent(result.planId);
+ if (previous == null)
+ {
+ StreamingState state = new StreamingState(result);
+ states.put(state.id(), state);
+ state.phase.start();
+ result.addEventListener(state);
+ }
+ else
+ {
+ logger.warn("Duplicate streaming states detected for id
{}", result.planId);
+ }
+ }
+ }
+ };
+
+ public StreamManager()
+ {
+ DurationSpec duration = DatabaseDescriptor.getStreamingStateExpires();
+ long sizeBytes = DatabaseDescriptor.getStreamingStateSize().toBytes();
+ long numElements = sizeBytes / StreamingState.ELEMENT_SIZE;
+ logger.info("Storing streaming state for {} or for {} elements",
duration, numElements);
+ states = CacheBuilder.newBuilder()
+ .expireAfterWrite(duration.quantity(),
duration.unit())
+ .maximumSize(numElements)
+ .build();
+ }
+
+ public void start()
+ {
+ addListener(listener);
+ }
+
+ public void stop()
+ {
+ removeListener(listener);
+ }
+
+ public Collection<StreamingState> getStreamingStates()
+ {
+ return states.asMap().values();
+ }
+
+ public StreamingState getStreamingState(UUID id)
+ {
+ return states.getIfPresent(id);
+ }
+
+ @VisibleForTesting
+ public void putStreamingState(StreamingState state)
+ {
+ synchronized (states)
+ {
+ StreamingState previous = states.getIfPresent(state.id());
+ if (previous != null)
+ throw new AssertionError("StreamPlan id " + state.id() + "
already exists");
+ states.put(state.id(), state);
+ }
+ }
+
+ @VisibleForTesting
+ public void clearStates()
+ {
+ // states.cleanUp() doesn't clear, it looks to only run gc on things
that could be removed... this method should remove all state
+ states.asMap().clear();
+ }
+
public Set<CompositeData> getCurrentStreams()
{
return
Sets.newHashSet(Iterables.transform(Iterables.concat(initiatorStreams.values(),
followerStreams.values()), new Function<StreamResultFuture, CompositeData>()
@@ -231,6 +319,7 @@ public class StreamManager implements StreamManagerMBean
result.addListener(() -> initiatorStreams.remove(result.planId));
initiatorStreams.put(result.planId, result);
+ notifySafeOnRegister(result);
}
public StreamResultFuture registerFollower(final StreamResultFuture result)
@@ -240,7 +329,51 @@ public class StreamManager implements StreamManagerMBean
result.addListener(() -> followerStreams.remove(result.planId));
StreamResultFuture previous =
followerStreams.putIfAbsent(result.planId, result);
- return previous == null ? result : previous;
+ if (previous == null)
+ {
+ notifySafeOnRegister(result);
+ return result;
+ }
+ return previous;
+ }
+
+ @VisibleForTesting
+ public void putInitiatorStream(StreamResultFuture future)
+ {
+ StreamResultFuture current =
initiatorStreams.putIfAbsent(future.planId, future);
+ assert current == null: "Duplicat initiator stream for " +
future.planId;
+ }
+
+ @VisibleForTesting
+ public void putFollowerStream(StreamResultFuture future)
+ {
+ StreamResultFuture current =
followerStreams.putIfAbsent(future.planId, future);
+ assert current == null: "Duplicate follower stream for " +
future.planId;
+ }
+
+ public void addListener(StreamListener listener)
+ {
+ listeners.add(listener);
+ }
+
+ public void removeListener(StreamListener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ private void notifySafeOnRegister(StreamResultFuture result)
+ {
+ for (StreamListener l : listeners)
+ {
+ try
+ {
+ l.onRegister(result);
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Failed to notify stream listener of new
Initiator/Follower", t);
+ }
+ }
}
public StreamResultFuture getReceivingStream(UUID planId)
@@ -248,6 +381,11 @@ public class StreamManager implements StreamManagerMBean
return followerStreams.get(planId);
}
+ public StreamResultFuture getInitiatorStream(UUID planId)
+ {
+ return initiatorStreams.get(planId);
+ }
+
public void addNotificationListener(NotificationListener listener,
NotificationFilter filter, Object handback)
{
notifier.addNotificationListener(listener, filter, handback);
@@ -282,4 +420,9 @@ public class StreamManager implements StreamManagerMBean
return streamResultFuture.getSession(peer, sessionIndex);
}
+
+ public interface StreamListener
+ {
+ default void onRegister(StreamResultFuture result) {}
+ }
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 66e99be..8f11587 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -62,7 +62,7 @@ public final class StreamResultFuture extends
AsyncFuture<StreamState>
* @param planId Stream plan ID
* @param streamOperation Stream streamOperation
*/
- private StreamResultFuture(UUID planId, StreamOperation streamOperation,
StreamCoordinator coordinator)
+ public StreamResultFuture(UUID planId, StreamOperation streamOperation,
StreamCoordinator coordinator)
{
this.planId = planId;
this.streamOperation = streamOperation;
@@ -208,7 +208,16 @@ public final class StreamResultFuture extends
AsyncFuture<StreamState>
{
// delegate to listener
for (StreamEventHandler listener : eventListeners)
- listener.handleStreamEvent(event);
+ {
+ try
+ {
+ listener.handleStreamEvent(event);
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Unexpected exception in listern while calling
handleStreamEvent", t);
+ }
+ }
}
private synchronized void maybeComplete()
diff --git a/src/java/org/apache/cassandra/streaming/StreamingState.java
b/src/java/org/apache/cassandra/streaming/StreamingState.java
new file mode 100644
index 0000000..b7ef61e
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/StreamingState.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.virtual.SimpleDataSet;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class StreamingState implements StreamEventHandler
+{
+ private static final Logger logger =
LoggerFactory.getLogger(StreamingState.class);
+
+ public static final long ELEMENT_SIZE = ObjectSizes.measureDeep(new
StreamingState(UUID.randomUUID(), StreamOperation.OTHER, false));
+
+ public enum Status
+ {INIT, START, SUCCESS, FAILURE}
+
+ private final long createdAtMillis = Clock.Global.currentTimeMillis();
+
+ // while streaming is running, this is a cache of StreamInfo seen with
progress state
+ // the reason for the cache is that StreamSession drops data after tasks
(recieve/send) complete, this makes
+ // it so that current state of a future tracks work pending rather than
work done, cache solves this by not deleting
+ // when tasks complete
+ // To lower memory costs, clear this after the stream completes
+ private ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = new
ConcurrentHashMap<>();
+
+ private final UUID id;
+ private final boolean follower;
+ private final StreamOperation operation;
+ private Set<InetSocketAddress> peers = null;
+ private Sessions sessions = Sessions.EMPTY;
+
+ private Status status;
+ private String completeMessage = null;
+
+ private final long[] stateTimesNanos;
+ private volatile long lastUpdatedAtNanos;
+
+ // API for state changes
+ public final Phase phase = new Phase();
+
+ public StreamingState(StreamResultFuture result)
+ {
+ this(result.planId, result.streamOperation,
result.getCoordinator().isFollower());
+ }
+
+ private StreamingState(UUID planId, StreamOperation streamOperation,
boolean follower)
+ {
+ this.id = planId;
+ this.operation = streamOperation;
+ this.follower = follower;
+ this.stateTimesNanos = new long[Status.values().length];
+ updateState(Status.INIT);
+ }
+
+ public UUID id()
+ {
+ return id;
+ }
+
+ public boolean follower()
+ {
+ return follower;
+ }
+
+ public StreamOperation operation()
+ {
+ return operation;
+ }
+
+ public Set<InetSocketAddress> peers()
+ {
+ Set<InetSocketAddress> peers = this.peers;
+ if (peers != null)
+ return peers;
+ ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress =
this.streamProgress;
+ if (streamProgress != null)
+ return streamProgress.keySet();
+ return Collections.emptySet();
+ }
+
+ public Status status()
+ {
+ return status;
+ }
+
+ public Sessions sessions()
+ {
+ return sessions;
+ }
+
+ public boolean isComplete()
+ {
+ switch (status)
+ {
+ case SUCCESS:
+ case FAILURE:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public StreamResultFuture future()
+ {
+ if (follower)
+ return StreamManager.instance.getReceivingStream(id);
+ else
+ return StreamManager.instance.getInitiatorStream(id);
+ }
+
+ public float progress()
+ {
+ switch (status)
+ {
+ case INIT:
+ return 0;
+ case START:
+ return Math.min(0.99f, sessions().progress().floatValue());
+ case SUCCESS:
+ case FAILURE:
+ return 1;
+ default:
+ throw new AssertionError("unknown state: " + status);
+ }
+ }
+
+ public EnumMap<Status, Long> stateTimesMillis()
+ {
+ EnumMap<Status, Long> map = new EnumMap<>(Status.class);
+ for (int i = 0; i < stateTimesNanos.length; i++)
+ {
+ long nanos = stateTimesNanos[i];
+ if (nanos != 0)
+ map.put(Status.values()[i], nanosToMillis(nanos));
+ }
+ return map;
+ }
+
+ public long durationMillis()
+ {
+ long endNanos = lastUpdatedAtNanos;
+ if (!isComplete())
+ endNanos = Clock.Global.nanoTime();
+ return TimeUnit.NANOSECONDS.toMillis(endNanos - stateTimesNanos[0]);
+ }
+
+ public long lastUpdatedAtMillis()
+ {
+ return nanosToMillis(lastUpdatedAtNanos);
+ }
+
+ public long lastUpdatedAtNanos()
+ {
+ return lastUpdatedAtNanos;
+ }
+
+ public String failureCause()
+ {
+ if (status == Status.FAILURE)
+ return completeMessage;
+ return null;
+ }
+
+ public String successMessage()
+ {
+ if (status == Status.SUCCESS)
+ return completeMessage;
+ return null;
+ }
+
+ @Override
+ public String toString()
+ {
+ TableBuilder table = new TableBuilder();
+ table.add("id", id.toString());
+ table.add("status", status().name().toLowerCase());
+ table.add("progress", (progress() * 100) + "%");
+ table.add("duration_ms", Long.toString(durationMillis()));
+ table.add("last_updated_ms", Long.toString(lastUpdatedAtMillis()));
+ table.add("failure_cause", failureCause());
+ table.add("success_message", successMessage());
+ for (Map.Entry<Status, Long> e : stateTimesMillis().entrySet())
+ table.add("status_" + e.getKey().name().toLowerCase() + "_ms",
e.toString());
+ return table.toString();
+ }
+
+ @Override
+ public synchronized void handleStreamEvent(StreamEvent event)
+ {
+ ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress =
this.streamProgress;
+ if (streamProgress == null)
+ {
+ logger.warn("Got stream event {} after the stream completed",
event.eventType);
+ return;
+ }
+ try
+ {
+ switch (event.eventType)
+ {
+ case STREAM_PREPARED:
+ streamPrepared((StreamEvent.SessionPreparedEvent) event);
+ break;
+ case STREAM_COMPLETE:
+ // currently not taking track of state, so ignore
+ break;
+ case FILE_PROGRESS:
+ streamProgress((StreamEvent.ProgressEvent) event);
+ break;
+ default:
+ logger.warn("Unknown stream event type: {}",
event.eventType);
+ }
+ }
+ catch (Throwable t)
+ {
+ logger.warn("Unexpected exception handling stream event", t);
+ }
+ sessions = Sessions.create(streamProgress.values());
+ lastUpdatedAtNanos = Clock.Global.nanoTime();
+ }
+
+ private void streamPrepared(StreamEvent.SessionPreparedEvent event)
+ {
+ SessionInfo session = new SessionInfo(event.session);
+ streamProgress.putIfAbsent(session.peer, session);
+ }
+
+ private void streamProgress(StreamEvent.ProgressEvent event)
+ {
+ SessionInfo info = streamProgress.get(event.progress.peer);
+ if (info != null)
+ {
+ info.updateProgress(event.progress);
+ }
+ else
+ {
+ logger.warn("[Stream #{}} ID#{}] Recieved stream progress before
prepare; peer={}", id, event.progress.sessionIndex, event.progress.peer);
+ }
+ }
+
+ @Override
+ public synchronized void onSuccess(@Nullable StreamState state)
+ {
+ ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress =
this.streamProgress;
+ if (streamProgress != null)
+ {
+ sessions = Sessions.create(streamProgress.values());
+ peers = new HashSet<>(streamProgress.keySet());
+ this.streamProgress = null;
+ updateState(Status.SUCCESS);
+ }
+ }
+
+ @Override
+ public synchronized void onFailure(Throwable throwable)
+ {
+ ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress =
this.streamProgress;
+ if (streamProgress != null)
+ {
+ sessions = Sessions.create(streamProgress.values());
+ peers = new HashSet<>(streamProgress.keySet());
+ this.streamProgress = null;
+ }
+ completeMessage = Throwables.getStackTraceAsString(throwable);
+ updateState(Status.FAILURE);
+ }
+
+ private synchronized void updateState(Status state)
+ {
+ this.status = state;
+ long now = Clock.Global.nanoTime();
+ stateTimesNanos[state.ordinal()] = now;
+ lastUpdatedAtNanos = now;
+ }
+
+ private long nanosToMillis(long nanos)
+ {
+ // nanos - creationTimeNanos = delta since init
+ return createdAtMillis + TimeUnit.NANOSECONDS.toMillis(nanos -
stateTimesNanos[0]);
+ }
+
+ public class Phase
+ {
+ public void start()
+ {
+ updateState(Status.START);
+ }
+ }
+
+ public static class Sessions
+ {
+ public static final Sessions EMPTY = new Sessions(0, 0, 0, 0, 0, 0, 0,
0);
+
+ public final long bytesToReceive, bytesReceived;
+ public final long bytesToSend, bytesSent;
+ public final long filesToReceive, filesReceived;
+ public final long filesToSend, filesSent;
+
+ public Sessions(long bytesToReceive, long bytesReceived, long
bytesToSend, long bytesSent, long filesToReceive, long filesReceived, long
filesToSend, long filesSent)
+ {
+ this.bytesToReceive = bytesToReceive;
+ this.bytesReceived = bytesReceived;
+ this.bytesToSend = bytesToSend;
+ this.bytesSent = bytesSent;
+ this.filesToReceive = filesToReceive;
+ this.filesReceived = filesReceived;
+ this.filesToSend = filesToSend;
+ this.filesSent = filesSent;
+ }
+
+ public static String columns()
+ {
+ return " bytes_to_receive bigint, \n" +
+ " bytes_received bigint, \n" +
+ " bytes_to_send bigint, \n" +
+ " bytes_sent bigint, \n" +
+ " files_to_receive bigint, \n" +
+ " files_received bigint, \n" +
+ " files_to_send bigint, \n" +
+ " files_sent bigint, \n";
+ }
+
+ public static Sessions create(Collection<SessionInfo> sessions)
+ {
+ long bytesToReceive = 0;
+ long bytesReceived = 0;
+ long filesToReceive = 0;
+ long filesReceived = 0;
+ long bytesToSend = 0;
+ long bytesSent = 0;
+ long filesToSend = 0;
+ long filesSent = 0;
+ for (SessionInfo session : sessions)
+ {
+ bytesToReceive += session.getTotalSizeToReceive();
+ bytesReceived += session.getTotalSizeReceived();
+
+ filesToReceive += session.getTotalFilesToReceive();
+ filesReceived += session.getTotalFilesReceived();
+
+ bytesToSend += session.getTotalSizeToSend();
+ bytesSent += session.getTotalSizeSent();
+
+ filesToSend += session.getTotalFilesToSend();
+ filesSent += session.getTotalFilesSent();
+ }
+ if (0 == bytesToReceive && 0 == bytesReceived && 0 ==
filesToReceive && 0 == filesReceived && 0 == bytesToSend && 0 == bytesSent && 0
== filesToSend && 0 == filesSent)
+ return EMPTY;
+ return new Sessions(bytesToReceive, bytesReceived,
+ bytesToSend, bytesSent,
+ filesToReceive, filesReceived,
+ filesToSend, filesSent);
+ }
+
+ public boolean isEmpty()
+ {
+ return this == EMPTY;
+ }
+
+ public BigDecimal receivedBytesPercent()
+ {
+ return div(bytesReceived, bytesToReceive);
+ }
+
+ public BigDecimal sentBytesPercent()
+ {
+ return div(bytesSent, bytesToSend);
+ }
+
+ public BigDecimal progress()
+ {
+ return div(bytesSent + bytesReceived, bytesToSend +
bytesToReceive);
+ }
+
+ private static BigDecimal div(long a, long b)
+ {
+ // not "correct" but its what you would do if this happened...
+ if (b == 0)
+ return BigDecimal.ZERO;
+ return BigDecimal.valueOf(a).divide(BigDecimal.valueOf(b), 4,
RoundingMode.HALF_UP);
+ }
+
+ public void update(SimpleDataSet ds)
+ {
+ if (isEmpty())
+ return;
+ ds.column("bytes_to_receive", bytesToReceive)
+ .column("bytes_received", bytesReceived)
+ .column("bytes_to_send", bytesToSend)
+ .column("bytes_sent", bytesSent)
+ .column("files_to_receive", filesToReceive)
+ .column("files_received", filesReceived)
+ .column("files_to_send", filesToSend)
+ .column("files_sent", filesSent);
+ }
+ }
+}
diff --git
a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
index e504dae..b8c7487 100644
---
a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
+++
b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
@@ -87,7 +87,7 @@ public class StreamEventJMXNotifier extends
NotificationBroadcasterSupport imple
Notification notif = new
Notification(StreamEvent.class.getCanonicalName() + ".failure",
StreamManagerMBean.OBJECT_NAME,
seq.getAndIncrement());
- notif.setUserData(t.fillInStackTrace().toString());
+ notif.setUserData(t.toString());
sendNotification(notif);
}
}
diff --git
a/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java
b/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java
index 166ed3d..025716b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/formatter/TableBuilder.java
@@ -18,7 +18,11 @@
package org.apache.cassandra.tools.nodetool.formatter;
+import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
+import java.io.UncheckedIOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -116,6 +120,22 @@ public class TableBuilder
}
}
+ @Override
+ public String toString()
+ {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ try (PrintStream stream = new PrintStream(os, true,
StandardCharsets.UTF_8.displayName()))
+ {
+ printTo(stream);
+ stream.flush();
+ return os.toString(StandardCharsets.UTF_8.displayName());
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ throw new UncheckedIOException(e);
+ }
+ }
+
/**
* Share max offsets across multiple TableBuilders
*/
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index e5a933d..38cfef0 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -120,6 +120,7 @@ import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.StorageServiceMBean;
import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
import org.apache.cassandra.service.snapshot.SnapshotManager;
+import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamReceiveTask;
import org.apache.cassandra.streaming.StreamTransferTask;
import org.apache.cassandra.streaming.async.NettyStreamingChannel;
@@ -667,6 +668,7 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
throw new IllegalStateException(String.format("%s != %s",
FBUtilities.getBroadcastAddressAndPort(), broadcastAddress()));
ActiveRepairService.instance.start();
+ StreamManager.instance.start();
CassandraDaemon.getInstanceForTesting().completeSetup();
}
catch (Throwable t)
@@ -740,6 +742,7 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
NettyStreamingChannel::shutdown,
() -> StreamReceiveTask.shutdownAndWait(1L,
MINUTES),
() -> StreamTransferTask.shutdownAndWait(1L,
MINUTES),
+ () -> StreamManager.instance.stop(),
() ->
SecondaryIndexManager.shutdownAndWait(1L, MINUTES),
() ->
IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
() ->
ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
new file mode 100644
index 0000000..db89669
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RebuildStreamingTest extends TestBaseImpl
+{
+ @Test
+ public void test() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(2)
+ .withConfig(c ->
c.with(Feature.values()).set("stream_entire_sstables", false))
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id
varchar PRIMARY KEY);"));
+ cluster.stream().forEach(i ->
i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success());
+ IInvokableInstance first = cluster.get(1);
+ IInvokableInstance second = cluster.get(2);
+ long expectedFiles = 10;
+ for (int i = 0; i < expectedFiles; i++)
+ {
+ first.executeInternal(withKeyspace("insert into
%s.users(user_id) values (?)"), "dcapwell" + i);
+ first.flush(KEYSPACE);
+ }
+
+ second.nodetoolResult("rebuild", "--keyspace",
KEYSPACE).asserts().success();
+
+ SimpleQueryResult qr = first.executeInternalWithResult("SELECT *
FROM system_views.streaming");
+ String txt = QueryResultUtil.expand(qr);
+ qr.reset();
+ assertThat(qr.toObjectArrays().length).describedAs("Found
rows\n%s", txt).isEqualTo(1);
+ assertThat(qr.hasNext()).isTrue();
+ Row row = qr.next();
+ QueryResultUtil.assertThat(row)
+ .isEqualTo("peers",
Collections.singletonList("/127.0.0.2:7012"))
+ .isEqualTo("follower", true)
+ .isEqualTo("operation", "Rebuild")
+ .isEqualTo("status", "success")
+ .isEqualTo("progress_percentage", 100.0F)
+ .isEqualTo("success_message",
null).isEqualTo("failure_cause", null)
+ .isEqualTo("files_sent", expectedFiles)
+ .columnsEqualTo("files_sent", "files_to_send")
+ .columnsEqualTo("bytes_sent", "bytes_to_send")
+ .isEqualTo("files_received", 0L)
+ .columnsEqualTo("files_received",
"files_to_receive", "bytes_received", "bytes_to_receive");
+ long totalBytes = row.getLong("bytes_sent");
+ assertThat(totalBytes).isGreaterThan(0);
+
+ qr = second.executeInternalWithResult("SELECT * FROM
system_views.streaming");
+ txt = QueryResultUtil.expand(qr);
+ qr.reset();
+ assertThat(qr.toObjectArrays().length).describedAs("Found
rows\n%s", txt).isEqualTo(1);
+ assertThat(qr.hasNext()).isTrue();
+
+ QueryResultUtil.assertThat(qr.next())
+ .isEqualTo("peers",
Collections.singletonList("/127.0.0.1:7012"))
+ .isEqualTo("follower", false)
+ .isEqualTo("operation", "Rebuild")
+ .isEqualTo("status", "success")
+ .isEqualTo("progress_percentage", 100.0F)
+ .isEqualTo("success_message",
null).isEqualTo("failure_cause", null)
+ .columnsEqualTo("files_to_receive",
"files_received").isEqualTo("files_received", expectedFiles)
+ .columnsEqualTo("bytes_to_receive",
"bytes_received").isEqualTo("bytes_received", totalBytes)
+ .columnsEqualTo("files_sent", "files_to_send",
"bytes_sent", "bytes_to_send").isEqualTo("files_sent", 0L);
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
index f9c1d90..58842bc 100644
---
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
+++
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
@@ -23,6 +23,7 @@ import java.util.function.Predicate;
import org.apache.cassandra.distributed.api.Row;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
import org.assertj.core.api.Assertions;
public class QueryResultUtil
@@ -69,21 +70,70 @@ public class QueryResultUtil
return true;
}
- public static AssertHelper assertThat(SimpleQueryResult qr)
+ public static SimpleQueryResultAssertHelper assertThat(SimpleQueryResult
qr)
{
- return new AssertHelper(qr);
+ return new SimpleQueryResultAssertHelper(qr);
}
- public static class AssertHelper
+ public static RowAssertHelper assertThat(Row row)
+ {
+ return new RowAssertHelper(row);
+ }
+
+ public static String expand(SimpleQueryResult qr)
+ {
+ StringBuilder sb = new StringBuilder();
+ int rowNum = 1;
+ while (qr.hasNext())
+ {
+ sb.append("@ Row ").append(rowNum).append('\n');
+ TableBuilder table = new TableBuilder('|');
+ Row next = qr.next();
+ for (String column : qr.names())
+ {
+ Object value = next.get(column);
+ table.add(column, value == null ? null : value.toString());
+ }
+ sb.append(table);
+ }
+ return sb.toString();
+ }
+
+ public static class RowAssertHelper
+ {
+ private final Row row;
+
+ public RowAssertHelper(Row row)
+ {
+ this.row = row;
+ }
+
+ public RowAssertHelper isEqualTo(String column, Object expected)
+ {
+ Object actual = row.get(column);
+ Assertions.assertThat(actual).describedAs("Column %s had
unexpected value", column).isEqualTo(expected);
+ return this;
+ }
+
+ public RowAssertHelper columnsEqualTo(String first, String... others)
+ {
+ Object expected = row.get(first);
+ for (String other : others)
+
Assertions.assertThat(row.<Object>get(other)).describedAs("Columns %s and %s
are not equal", first, other).isEqualTo(expected);
+ return this;
+ }
+ }
+
+ public static class SimpleQueryResultAssertHelper
{
private final SimpleQueryResult qr;
- private AssertHelper(SimpleQueryResult qr)
+ private SimpleQueryResultAssertHelper(SimpleQueryResult qr)
{
this.qr = qr;
}
- public AssertHelper contains(Object... values)
+ public SimpleQueryResultAssertHelper contains(Object... values)
{
qr.reset();
if (!QueryResultUtil.contains(qr, a -> QueryResultUtil.equals(a,
values)))
@@ -91,7 +141,7 @@ public class QueryResultUtil
return this;
}
- public AssertHelper contains(Row row)
+ public SimpleQueryResultAssertHelper contains(Row row)
{
qr.reset();
if (!QueryResultUtil.contains(qr, a -> QueryResultUtil.equals(a,
row)))
@@ -99,7 +149,7 @@ public class QueryResultUtil
return this;
}
- public AssertHelper contains(Predicate<Row> fn)
+ public SimpleQueryResultAssertHelper contains(Predicate<Row> fn)
{
qr.reset();
if (!QueryResultUtil.contains(qr, fn))
@@ -107,7 +157,7 @@ public class QueryResultUtil
return this;
}
- public AssertHelper isEqualTo(Object... values)
+ public SimpleQueryResultAssertHelper isEqualTo(Object... values)
{
Assertions.assertThat(qr.toObjectArrays())
.hasSize(1)
@@ -115,13 +165,13 @@ public class QueryResultUtil
return this;
}
- public AssertHelper hasSize(int size)
+ public SimpleQueryResultAssertHelper hasSize(int size)
{
Assertions.assertThat(qr.toObjectArrays()).hasSize(size);
return this;
}
- public AssertHelper hasSizeGreaterThan(int size)
+ public SimpleQueryResultAssertHelper hasSizeGreaterThan(int size)
{
Assertions.assertThat(qr.toObjectArrays()).hasSizeGreaterThan(size);
return this;
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 6844cb8..66e9acf 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -1442,6 +1442,7 @@ public abstract class CQLTester
Assert.assertEquals(String.format("Invalid number of (expected)
values provided for row %d", i), expected == null ? 1 : expected.length,
meta.size());
+ StringBuilder error = new StringBuilder();
for (int j = 0; j < meta.size(); j++)
{
ColumnSpecification column = meta.get(j);
@@ -1454,15 +1455,17 @@ public abstract class CQLTester
{
Object actualValueDecoded = actualValue == null ? null :
column.type.getSerializer().deserialize(actualValue);
if (!Objects.equal(expected != null ? expected[j] : null,
actualValueDecoded))
- Assert.fail(String.format("Invalid value for row %d
column %d (%s of type %s), expected <%s> but got <%s>",
- i,
- j,
- column.name,
- column.type.asCQL3Type(),
-
formatValue(expectedByteValue != null ? expectedByteValue.duplicate() : null,
column.type),
- formatValue(actualValue,
column.type)));
+ error.append(String.format("Invalid value for row %d
column %d (%s of type %s), expected <%s> but got <%s>",
+ i,
+ j,
+ column.name,
+ column.type.asCQL3Type(),
+
formatValue(expectedByteValue != null ? expectedByteValue.duplicate() : null,
column.type),
+ formatValue(actualValue,
column.type))).append("\n");
}
}
+ if (error.length() > 0)
+ Assert.fail(error.toString());
i++;
}
diff --git a/test/unit/org/apache/cassandra/db/MmapFileTest.java
b/test/unit/org/apache/cassandra/db/MmapFileTest.java
index a666426..4619df3 100644
--- a/test/unit/org/apache/cassandra/db/MmapFileTest.java
+++ b/test/unit/org/apache/cassandra/db/MmapFileTest.java
@@ -25,14 +25,24 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.File;
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.io.util.FileUtils;
public class MmapFileTest
{
+ @BeforeClass
+ public static void setup()
+ {
+ // PathUtils touches StorageService which touches StreamManager which
requires configs be setup
+ DatabaseDescriptor.daemonInitialization();
+ }
+
+
/**
* Verifies that {@link sun.misc.Cleaner} works and that mmap'd files can
be deleted.
*/
diff --git a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
index 3e28040..d28d0dc 100644
--- a/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
+++ b/test/unit/org/apache/cassandra/db/filter/ColumnFilterTest.java
@@ -92,6 +92,8 @@ public class ColumnFilterTest
@BeforeClass
public static void beforeClass()
{
+ // Gossiper touches StorageService which touches StreamManager which
requires configs be setup
+ DatabaseDescriptor.daemonInitialization();
DatabaseDescriptor.setSeedProvider(Arrays::asList);
DatabaseDescriptor.setEndpointSnitch(new SimpleSnitch());
DatabaseDescriptor.setDefaultFailureDetector();
diff --git
a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
new file mode 100644
index 0000000..270305a
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamCoordinator;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.StreamSummary;
+import org.apache.cassandra.streaming.StreamingChannel;
+import org.apache.cassandra.streaming.StreamingState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.assertj.core.util.Throwables;
+
+public class StreamingVirtualTableTest extends CQLTester
+{
+ private static final String KS_NAME = "vts";
+ private static final InetAddressAndPort PEER1 = address(127, 0, 0, 1);
+ private static final InetAddressAndPort PEER2 = address(127, 0, 0, 2);
+ private static final InetAddressAndPort PEER3 = address(127, 0, 0, 3);
+ private static String TABLE_NAME;
+
+ @BeforeClass
+ public static void setup()
+ {
+ CQLTester.setUpClass();
+ StreamingVirtualTable table = new StreamingVirtualTable(KS_NAME);
+ TABLE_NAME = table.toString();
+ VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(KS_NAME,
ImmutableList.of(table)));
+ }
+
+ @Before
+ public void clearState()
+ {
+ StreamManager.instance.clearStates();
+ }
+
+ @Test
+ public void empty() throws Throwable
+ {
+ assertEmpty(execute(t("select * from %s")));
+ }
+
+ @Test
+ public void single() throws Throwable
+ {
+ StreamingState state = stream(true);
+ assertRows(execute(t("select id, follower, operation, peers, status,
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
+ new Object[] { state.id(), true, "Repair",
Collections.emptyList(), "init", 0F, new Date(state.lastUpdatedAtMillis()),
null, null });
+
+ state.phase.start();
+ assertRows(execute(t("select id, follower, operation, peers, status,
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
+ new Object[] { state.id(), true, "Repair",
Collections.emptyList(), "start", 0F, new Date(state.lastUpdatedAtMillis()),
null, null });
+
+ state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.PREPARING)));
+
+ state.onSuccess(new StreamState(state.id(), StreamOperation.REPAIR,
ImmutableSet.of(new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(),
Collections.emptyList(), StreamSession.State.COMPLETE))));
+ assertRows(execute(t("select id, follower, operation, peers, status,
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
+ new Object[] { state.id(), true, "Repair",
Arrays.asList(address(127, 0, 0, 2).toString()), "success", 100F, new
Date(state.lastUpdatedAtMillis()), null, null });
+ }
+
+ @Test
+ public void progressInitiator() throws Throwable
+ {
+ progress(false);
+ }
+
+ @Test
+ public void progressFollower() throws Throwable
+ {
+ progress(true);
+ }
+
+ public void progress(boolean follower) throws Throwable
+ {
+ StreamingState state = stream(follower);
+ StreamResultFuture future = state.future();
+ state.phase.start();
+
+ SessionInfo s1 = new SessionInfo(PEER2, 0,
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()),
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
+ SessionInfo s2 = new SessionInfo(PEER3, 0,
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()),
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
+
+ state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), s1));
+ state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), s2));
+
+ long bytesToReceive = 0, bytesToSend = 0;
+ long filesToReceive = 0, filesToSend = 0;
+ for (SessionInfo s : Arrays.asList(s1, s2))
+ {
+ bytesToReceive += s.getTotalSizeToReceive();
+ bytesToSend += s.getTotalSizeToSend();
+ filesToReceive += s.getTotalFilesToReceive();
+ filesToSend += s.getTotalFilesToSend();
+ }
+ assertRows(execute(t("select id, follower, peers, status,
progress_percentage, bytes_to_receive, bytes_received, bytes_to_send,
bytes_sent, files_to_receive, files_received, files_to_send, files_sent from
%s")),
+ new Object[] { state.id(), follower,
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", 0F, bytesToReceive,
0L, bytesToSend, 0L, filesToReceive, 0L, filesToSend, 0L });
+
+ // update progress
+ long bytesReceived = 0, bytesSent = 0;
+ for (SessionInfo s : Arrays.asList(s1, s2))
+ {
+ long in = s.getTotalFilesToReceive() - 1;
+ long inBytes = s.getTotalSizeToReceive() - in;
+ long out = s.getTotalFilesToSend() - 1;
+ long outBytes = s.getTotalSizeToSend() - out;
+ state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(),
new ProgressInfo((InetAddressAndPort) s.peer, 0, "0",
ProgressInfo.Direction.IN, inBytes, inBytes)));
+ state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(),
new ProgressInfo((InetAddressAndPort) s.peer, 0, "0",
ProgressInfo.Direction.OUT, outBytes, outBytes)));
+ bytesReceived += inBytes;
+ bytesSent += outBytes;
+ }
+
+ assertRows(execute(t("select id, follower, peers, status,
bytes_to_receive, bytes_received, bytes_to_send, bytes_sent, files_to_receive,
files_received, files_to_send, files_sent from %s")),
+ new Object[] { state.id(), follower,
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", bytesToReceive,
bytesReceived, bytesToSend, bytesSent, filesToReceive, 2L, filesToSend, 2L });
+
+ // finish
+ for (SessionInfo s : Arrays.asList(s1, s2))
+ {
+ // complete the rest
+ for (long i = 1; i < s.getTotalFilesToReceive(); i++)
+ state.handleStreamEvent(new
StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort)
s.peer, 0, Long.toString(i), ProgressInfo.Direction.IN, 1, 1)));
+ for (long i = 1; i < s.getTotalFilesToSend(); i++)
+ state.handleStreamEvent(new
StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort)
s.peer, 0, Long.toString(i), ProgressInfo.Direction.OUT, 1, 1)));
+ }
+
+ assertRows(execute(t("select id, follower, peers, status,
progress_percentage, bytes_to_receive, bytes_received, bytes_to_send,
bytes_sent, files_to_receive, files_received, files_to_send, files_sent from
%s")),
+ new Object[] { state.id(), follower,
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", 99F,
bytesToReceive, bytesToReceive, bytesToSend, bytesToSend, filesToReceive,
filesToReceive, filesToSend, filesToSend });
+
+ state.onSuccess(future.getCurrentState());
+ assertRows(execute(t("select id, follower, peers, status,
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
+ new Object[] { state.id(), follower,
Arrays.asList(PEER2.toString(), PEER3.toString()), "success", 100F, new
Date(state.lastUpdatedAtMillis()), null, null });
+ }
+
+ private static StreamSummary streamSummary()
+ {
+ int files = ThreadLocalRandom.current().nextInt(2, 10);
+ return new StreamSummary(TableId.fromUUID(UUID.randomUUID()), files,
files * 1024);
+ }
+
+ @Test
+ public void failed() throws Throwable
+ {
+ StreamingState state = stream(true);
+ RuntimeException t = new RuntimeException("You failed!");
+ state.onFailure(t);
+ assertRows(execute(t("select id, follower, peers, status,
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
+ new Object[] { state.id(), true, Collections.emptyList(),
"failure", 100F, new Date(state.lastUpdatedAtMillis()),
Throwables.getStackTrace(t), null });
+ }
+
+ private static String t(String query)
+ {
+ return String.format(query, TABLE_NAME);
+ }
+
+ private static StreamingState stream(boolean follower)
+ {
+ StreamResultFuture future = new StreamResultFuture(UUID.randomUUID(),
StreamOperation.REPAIR, new StreamCoordinator(StreamOperation.REPAIR, 0,
StreamingChannel.Factory.Global.streamingFactory(), follower, false, null,
null) {
+ // initiator requires active sessions exist, else the future
becomes success right away.
+ @Override
+ public synchronized boolean hasActiveSessions()
+ {
+ return true;
+ }
+ });
+ StreamingState state = new StreamingState(future);
+ if (follower) StreamManager.instance.putFollowerStream(future);
+ else StreamManager.instance.putInitiatorStream(future);
+ StreamManager.instance.putStreamingState(state);
+ future.addEventListener(state);
+ return state;
+ }
+
+ private static InetAddressAndPort address(int a, int b, int c, int d)
+ {
+ try
+ {
+ return InetAddressAndPort.getByAddress(new byte[] {(byte) a,
(byte) b, (byte) c, (byte) d});
+ }
+ catch (UnknownHostException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/io/util/FileTest.java
b/test/unit/org/apache/cassandra/io/util/FileTest.java
index 0fb415a..d7340db 100644
--- a/test/unit/org/apache/cassandra/io/util/FileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/FileTest.java
@@ -33,6 +33,7 @@ import com.google.common.util.concurrent.RateLimiter;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.psjava.util.Triple;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -51,6 +52,9 @@ public class FileTest
dir = new java.io.File(parent, dirName); //checkstyle: permit this
instantiation
dir.mkdirs();
new File(dir).deleteRecursiveOnExit();
+
+ // PathUtils touches StorageService which touches StreamManager which
requires configs be setup
+ DatabaseDescriptor.daemonInitialization();
}
diff --git
a/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
index b70e514..11efc3c 100644
---
a/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
+++
b/test/unit/org/apache/cassandra/service/AbstractFilesystemOwnershipCheckTest.java
@@ -29,10 +29,12 @@ import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assume;
+import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.StartupChecksOptions;
import org.apache.cassandra.exceptions.StartupException;
import org.apache.cassandra.io.util.File;
@@ -175,6 +177,13 @@ public abstract class AbstractFilesystemOwnershipCheckTest
file.delete();
}
+ @BeforeClass
+ public static void setupConfig()
+ {
+ // PathUtils touches StorageService which touches StreamManager which
requires configs be setup
+ DatabaseDescriptor.daemonInitialization();
+ }
+
@After
public void teardown() throws IOException
{
diff --git a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
index 86b63a2..5227e51 100644
--- a/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
+++ b/test/unit/org/apache/cassandra/utils/binlog/BinLogTest.java
@@ -28,9 +28,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.File;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import net.openhft.chronicle.queue.ChronicleQueue;
@@ -60,6 +62,13 @@ public class BinLogTest
private BinLog binLog;
private Path path;
+ @BeforeClass
+ public static void setup()
+ {
+ // PathUtils touches StorageService which touches StreamManager which
requires configs be setup
+ DatabaseDescriptor.daemonInitialization();
+ }
+
@Before
public void setUp() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]