This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.1 by this push:
new 5be1038c5d Streaming progress virtual table lock contention can
trigger TCP_USER_TIMEOUT and fail streaming
5be1038c5d is described below
commit 5be1038c5d38af32d3cbb0545d867f21304f3a46
Author: David Capwell <[email protected]>
AuthorDate: Wed Jan 11 13:40:57 2023 -0800
Streaming progress virtual table lock contention can trigger
TCP_USER_TIMEOUT and fail streaming
patch by David Capwell; reviewed by Abe Ratnofsky, Jon Meredith for
CASSANDRA-18110
---
CHANGES.txt | 3 +-
conf/cassandra.yaml | 10 ++
src/java/org/apache/cassandra/config/Config.java | 3 +
.../cassandra/config/DatabaseDescriptor.java | 27 ++++
.../org/apache/cassandra/config/DurationSpec.java | 10 ++
.../streaming/CassandraCompressedStreamReader.java | 7 +-
.../streaming/CassandraCompressedStreamWriter.java | 3 +-
.../CassandraEntireSSTableStreamReader.java | 2 +-
.../CassandraEntireSSTableStreamWriter.java | 2 +-
.../db/streaming/CassandraStreamReader.java | 7 +-
.../db/streaming/CassandraStreamWriter.java | 6 +-
.../apache/cassandra/streaming/ProgressInfo.java | 5 +-
.../apache/cassandra/streaming/StreamEvent.java | 4 +-
.../apache/cassandra/streaming/StreamManager.java | 26 ++++
.../cassandra/streaming/StreamManagerMBean.java | 20 +++
.../cassandra/streaming/StreamResultFuture.java | 15 ++-
.../apache/cassandra/streaming/StreamSession.java | 22 ++--
.../apache/cassandra/streaming/StreamingState.java | 143 +++++----------------
.../management/ProgressInfoCompositeData.java | 3 +
.../test/streaming/RebuildStreamingTest.java | 33 ++++-
.../test/streaming/StreamingStatsDisabledTest.java | 65 ++++++++++
.../distributed/util/QueryResultUtil.java | 7 +
.../db/virtual/StreamingVirtualTableTest.java | 85 +++++++++---
.../cassandra/streaming/SessionInfoTest.java | 4 +-
24 files changed, 361 insertions(+), 151 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c6ceac0575..c5d192a9c8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
4.1.1
-* Fix perpetual load of denylist on read in cases where denylist can never be
loaded (CASSANDRA-18116)
+ * Streaming progress virtual table lock contention can trigger
TCP_USER_TIMEOUT and fail streaming (CASSANDRA-18110)
+ * Fix perpetual load of denylist on read in cases where denylist can never be
loaded (CASSANDRA-18116)
Merged from 4.0:
* Avoid ConcurrentModificationException in STCS/DTCS/TWCS.getSSTables
(CASSANDRA-17977)
* Restore internode custom tracing on 4.0's new messaging system
(CASSANDRA-17981)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ef7cc72605..f54c0d4617 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1161,6 +1161,16 @@ slow_query_log_timeout: 500ms
# bound (for example a few nodes with big files).
# streaming_connections_per_host: 1
+# Settings for stream stats tracking; used by system_views.streaming table
+# How long before a stream is evicted from tracking; this impacts both
historic and currently running
+# streams.
+# streaming_state_expires: 3d
+# How much memory may be used for tracking before evicting session from
tracking; once crossed
+# historic and currently running streams maybe impacted.
+# streaming_state_size: 40MiB
+# Enable/Disable tracking of streaming stats
+# streaming_stats_enabled: true
+
# Allows denying configurable access (rw/rr) to operations on configured ks,
table, and partitions, intended for use by
# operators to manage cluster health vs application access. See
CASSANDRA-12106 and CEP-13 for more details.
# partition_denylist_enabled: false
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index df419e780c..8a59ca2cda 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -852,6 +852,9 @@ public class Config
public volatile DurationSpec.LongNanosecondsBound streaming_state_expires
= new DurationSpec.LongNanosecondsBound("3d");
public volatile DataStorageSpec.LongBytesBound streaming_state_size = new
DataStorageSpec.LongBytesBound("40MiB");
+ public volatile boolean streaming_stats_enabled = true;
+ public volatile DurationSpec.IntSecondsBound
streaming_slow_events_log_timeout = new DurationSpec.IntSecondsBound("10s");
+
/** The configuration of startup checks. */
public volatile Map<StartupCheckType, Map<String, Object>> startup_checks
= new HashMap<>();
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 62d1acdada..d2c529c3d4 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -4294,6 +4294,33 @@ public class DatabaseDescriptor
}
}
+ public static boolean getStreamingStatsEnabled()
+ {
+ return conf.streaming_stats_enabled;
+ }
+
+ public static void setStreamingStatsEnabled(boolean streamingStatsEnabled)
+ {
+ if (conf.streaming_stats_enabled != streamingStatsEnabled)
+ {
+ logger.info("Setting streaming_stats_enabled to {}",
streamingStatsEnabled);
+ conf.streaming_stats_enabled = streamingStatsEnabled;
+ }
+ }
+
+ public static DurationSpec.IntSecondsBound
getStreamingSlowEventsLogTimeout() {
+ return conf.streaming_slow_events_log_timeout;
+ }
+
+ public static void setStreamingSlowEventsLogTimeout(String value) {
+ DurationSpec.IntSecondsBound next = new
DurationSpec.IntSecondsBound(value);
+ if (!conf.streaming_slow_events_log_timeout.equals(next))
+ {
+ logger.info("Setting streaming_slow_events_log to " + value);
+ conf.streaming_slow_events_log_timeout = next;
+ }
+ }
+
public static boolean isUUIDSSTableIdentifiersEnabled()
{
return conf.uuid_sstable_identifiers_enabled;
diff --git a/src/java/org/apache/cassandra/config/DurationSpec.java
b/src/java/org/apache/cassandra/config/DurationSpec.java
index ba7e9f8415..10d56c23eb 100644
--- a/src/java/org/apache/cassandra/config/DurationSpec.java
+++ b/src/java/org/apache/cassandra/config/DurationSpec.java
@@ -520,6 +520,16 @@ public abstract class DurationSpec
return new IntSecondsBound(value);
}
+ /**
+ * Returns this duration in the number of nanoseconds as an {@code int}
+ *
+ * @return this duration in number of nanoseconds or {@code
Integer.MAX_VALUE} if the number of nanoseconds is too large.
+ */
+ public int toNanoseconds()
+ {
+ return Ints.saturatedCast(unit().toNanos(quantity()));
+ }
+
/**
* Returns this duration in number of milliseconds as an {@code int}
*
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
index dda874ba40..005a9aaa6c 100644
---
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
+++
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -78,6 +78,7 @@ public class CassandraCompressedStreamReader extends
CassandraStreamReader
deserializer = new StreamDeserializer(cfs.metadata(), in,
inputVersion, getHeader(cfs.metadata()));
writer = createWriter(cfs, totalSize, repairedAt, pendingRepair,
format);
String filename = writer.getFilename();
+ String sectionName = filename + '-' + fileSeqNum;
int sectionIdx = 0;
for (SSTableReader.PartitionPositionBounds section : sections)
{
@@ -89,11 +90,15 @@ public class CassandraCompressedStreamReader extends
CassandraStreamReader
cis.position(section.lowerPosition);
in.reset(0);
+ long lastBytesRead = 0;
while (in.getBytesRead() < sectionLength)
{
writePartition(deserializer, writer);
// when compressed, report total bytes of compressed
chunks read since remoteFile.size is the sum of chunks transferred
- session.progress(filename + '-' + fileSeqNum,
ProgressInfo.Direction.IN, cis.chunkBytesRead(), totalSize);
+ long bytesRead = cis.chunkBytesRead();
+ long bytesDelta = bytesRead - lastBytesRead;
+ lastBytesRead = bytesRead;
+ session.progress(sectionName, ProgressInfo.Direction.IN,
bytesRead, bytesDelta, totalSize);
}
assert in.getBytesRead() == sectionLength;
}
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
index 99908f261b..41fd9b1651 100644
---
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
+++
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java
@@ -71,6 +71,7 @@ public class CassandraCompressedStreamWriter extends
CassandraStreamWriter
int sectionIdx = 0;
// stream each of the required sections of the file
+ String filename = sstable.descriptor.filenameFor(Component.DATA);
for (Section section : sections)
{
// length of the section to stream
@@ -94,7 +95,7 @@ public class CassandraCompressedStreamWriter extends
CassandraStreamWriter
bytesTransferred += toTransfer;
progress += toTransfer;
-
session.progress(sstable.descriptor.filenameFor(Component.DATA),
ProgressInfo.Direction.OUT, progress, totalSize);
+ session.progress(filename, ProgressInfo.Direction.OUT,
progress, toTransfer, totalSize);
}
}
logger.debug("[Stream #{}] Finished streaming file {} to {},
bytesTransferred = {}, totalSize = {}",
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
index 261c59ef63..515c85dea6 100644
---
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
+++
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java
@@ -122,7 +122,7 @@ public class CassandraEntireSSTableStreamReader implements
IStreamReader
prettyPrintMemory(totalSize));
writer.writeComponent(component.type, in, length);
- session.progress(writer.descriptor.filenameFor(component),
ProgressInfo.Direction.IN, length, length);
+ session.progress(writer.descriptor.filenameFor(component),
ProgressInfo.Direction.IN, length, length, length);
bytesRead += length;
logger.debug("[Stream #{}] Finished receiving {} component
from {}, componentSize = {}, readBytes = {}, totalSize = {}",
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
index 68546cefe6..3d679a515e 100644
---
a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
+++
b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java
@@ -93,7 +93,7 @@ public class CassandraEntireSSTableStreamWriter
long bytesWritten = out.writeFileToChannel(channel, limiter);
progress += bytesWritten;
- session.progress(sstable.descriptor.filenameFor(component),
ProgressInfo.Direction.OUT, bytesWritten, length);
+ session.progress(sstable.descriptor.filenameFor(component),
ProgressInfo.Direction.OUT, bytesWritten, bytesWritten, length);
logger.debug("[Stream #{}] Finished streaming {}.{} gen {}
component {} to {}, xfered = {}, length = {}, totalSize = {}",
session.planId(),
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 2cccee3a0a..04268f024c 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -126,11 +126,16 @@ public class CassandraStreamReader implements
IStreamReader
TrackedDataInputPlus in = new
TrackedDataInputPlus(streamCompressionInputStream);
deserializer = new StreamDeserializer(cfs.metadata(), in,
inputVersion, getHeader(cfs.metadata()));
writer = createWriter(cfs, totalSize, repairedAt, pendingRepair,
format);
+ String sequenceName = writer.getFilename() + '-' + fileSeqNum;
+ long lastBytesRead = 0;
while (in.getBytesRead() < totalSize)
{
writePartition(deserializer, writer);
// TODO move this to BytesReadTracker
- session.progress(writer.getFilename() + '-' + fileSeqNum,
ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+ long bytesRead = in.getBytesRead();
+ long bytesDelta = bytesRead - lastBytesRead;
+ lastBytesRead = bytesRead;
+ session.progress(sequenceName, ProgressInfo.Direction.IN,
bytesRead, bytesDelta, totalSize);
}
logger.debug("[Stream #{}] Finished receiving file #{} from {}
readBytes = {}, totalSize = {}",
session.planId(), fileSeqNum, session.peer,
FBUtilities.prettyPrintMemory(in.getBytesRead()),
FBUtilities.prettyPrintMemory(totalSize));
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
index d69f3eec59..9d9ea3c1fc 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamWriter.java
@@ -94,6 +94,7 @@ public class CassandraStreamWriter
long progress = 0L;
// stream each of the required sections of the file
+ String filename = sstable.descriptor.filenameFor(Component.DATA);
for (SSTableReader.PartitionPositionBounds section : sections)
{
long start = validator == null ? section.lowerPosition :
validator.chunkStart(section.lowerPosition);
@@ -112,8 +113,9 @@ public class CassandraStreamWriter
long lastBytesRead = write(proxy, validator, out, start,
transferOffset, toTransfer, bufferSize);
start += lastBytesRead;
bytesRead += lastBytesRead;
- progress += (lastBytesRead - transferOffset);
-
session.progress(sstable.descriptor.filenameFor(Component.DATA),
ProgressInfo.Direction.OUT, progress, totalSize);
+ long delta = lastBytesRead - transferOffset;
+ progress += delta;
+ session.progress(filename, ProgressInfo.Direction.OUT,
progress, delta, totalSize);
transferOffset = 0;
}
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index 2b306f8c1b..2ed78aca27 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -54,9 +54,11 @@ public class ProgressInfo implements Serializable
public final String fileName;
public final Direction direction;
public final long currentBytes;
+ public final long deltaBytes; // change from previous ProgressInfo
public final long totalBytes;
- public ProgressInfo(InetAddressAndPort peer, int sessionIndex, String
fileName, Direction direction, long currentBytes, long totalBytes)
+ public ProgressInfo(InetAddressAndPort peer, int sessionIndex, String
fileName, Direction direction,
+ long currentBytes, long deltaBytes, long totalBytes)
{
assert totalBytes > 0;
@@ -65,6 +67,7 @@ public class ProgressInfo implements Serializable
this.fileName = fileName;
this.direction = direction;
this.currentBytes = currentBytes;
+ this.deltaBytes = deltaBytes;
this.totalBytes = totalBytes;
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java
b/src/java/org/apache/cassandra/streaming/StreamEvent.java
index be7ad3dea6..ff83a191e9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamEvent.java
+++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java
@@ -87,11 +87,13 @@ public abstract class StreamEvent
public static class SessionPreparedEvent extends StreamEvent
{
public final SessionInfo session;
+ public final StreamSession.PrepareDirection prepareDirection;
- public SessionPreparedEvent(TimeUUID planId, SessionInfo session)
+ public SessionPreparedEvent(TimeUUID planId, SessionInfo session,
StreamSession.PrepareDirection prepareDirection)
{
super(Type.STREAM_PREPARED, planId);
this.session = session;
+ this.prepareDirection = prepareDirection;
}
}
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java
b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 46ab422eff..408b6f4abe 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -230,6 +230,8 @@ public class StreamManager implements StreamManagerMBean
@Override
public void onRegister(StreamResultFuture result)
{
+ if (!DatabaseDescriptor.getStreamingStatsEnabled())
+ return;
// reason for synchronized rather than states.get is to detect
duplicates
// streaming shouldn't be producing duplicates as that would imply
a planId collision
synchronized (states)
@@ -312,6 +314,30 @@ public class StreamManager implements StreamManagerMBean
}));
}
+ @Override
+ public boolean getStreamingStatsEnabled()
+ {
+ return DatabaseDescriptor.getStreamingStatsEnabled();
+ }
+
+ @Override
+ public void setStreamingStatsEnabled(boolean streamingStatsEnabled)
+ {
+ DatabaseDescriptor.setStreamingStatsEnabled(streamingStatsEnabled);
+ }
+
+ @Override
+ public String getStreamingSlowEventsLogTimeout()
+ {
+ return
DatabaseDescriptor.getStreamingSlowEventsLogTimeout().toString();
+ }
+
+ @Override
+ public void setStreamingSlowEventsLogTimeout(String value)
+ {
+ DatabaseDescriptor.setStreamingSlowEventsLogTimeout(value);
+ }
+
public void registerInitiator(final StreamResultFuture result)
{
result.addEventListener(notifier);
diff --git a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
index f329596c4a..e49c059590 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
@@ -29,4 +29,24 @@ public interface StreamManagerMBean extends
NotificationEmitter
* Returns the current state of all ongoing streams.
*/
Set<CompositeData> getCurrentStreams();
+
+ /**
+ * @return whether the streaming virtual table should collect stats while
streaming is running
+ */
+ boolean getStreamingStatsEnabled();
+
+ /**
+ * enable/disable collection of streaming stats while streaming is running.
+ */
+ void setStreamingStatsEnabled(boolean streamingStatsEnabled);
+
+ /**
+ * @return current timeout for streaming slow events log
+ */
+ String getStreamingSlowEventsLogTimeout();
+
+ /**
+ * Sets the timeout for the streaming slow events log
+ */
+ void setStreamingSlowEventsLogTimeout(String value);
}
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index 22af238de6..b43203dee3 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -17,11 +17,15 @@
*/
package org.apache.cassandra.streaming;
+import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.slf4j.Logger;
@@ -31,6 +35,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.utils.FBUtilities;
import static
org.apache.cassandra.streaming.StreamingChannel.Factory.Global.streamingFactory;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
/**
* A future on the result ({@link StreamState}) of a streaming plan.
@@ -53,6 +58,7 @@ public final class StreamResultFuture extends
AsyncFuture<StreamState>
public final StreamOperation streamOperation;
private final StreamCoordinator coordinator;
private final Collection<StreamEventHandler> eventListeners = new
ConcurrentLinkedQueue<>();
+ private final long slowEventsLogTimeoutNanos =
DatabaseDescriptor.getStreamingSlowEventsLogTimeout().toNanoseconds();
/**
* Create new StreamResult of given {@code planId} and streamOperation.
@@ -175,7 +181,7 @@ public final class StreamResultFuture extends
AsyncFuture<StreamState>
return planId.hashCode();
}
- void handleSessionPrepared(StreamSession session)
+ void handleSessionPrepared(StreamSession session,
StreamSession.PrepareDirection prepareDirection)
{
SessionInfo sessionInfo = session.getSessionInfo();
logger.info("[Stream #{} ID#{}] Prepare completed. Receiving {}
files({}), sending {} files({})",
@@ -185,7 +191,7 @@ public final class StreamResultFuture extends
AsyncFuture<StreamState>
FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToReceive()),
sessionInfo.getTotalFilesToSend(),
FBUtilities.prettyPrintMemory(sessionInfo.getTotalSizeToSend()));
- StreamEvent.SessionPreparedEvent event = new
StreamEvent.SessionPreparedEvent(planId, sessionInfo);
+ StreamEvent.SessionPreparedEvent event = new
StreamEvent.SessionPreparedEvent(planId, sessionInfo, prepareDirection);
coordinator.addSessionInfo(sessionInfo);
fireStreamEvent(event);
}
@@ -208,6 +214,7 @@ public final class StreamResultFuture extends
AsyncFuture<StreamState>
synchronized void fireStreamEvent(StreamEvent event)
{
// delegate to listener
+ long startNanos = nanoTime();
for (StreamEventHandler listener : eventListeners)
{
try
@@ -219,6 +226,10 @@ public final class StreamResultFuture extends
AsyncFuture<StreamState>
logger.warn("Unexpected exception in listern while calling
handleStreamEvent", t);
}
}
+ long totalNanos = nanoTime() - startNanos;
+ if (totalNanos > slowEventsLogTimeoutNanos)
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1,
TimeUnit.MINUTES, "Handling streaming events took longer than {}; took {}",
+ () -> new Object[] {
Duration.ofNanos(slowEventsLogTimeoutNanos), Duration.ofNanos(totalNanos)});
}
private synchronized void maybeComplete()
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index a540a1b6d0..811717f85d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -148,6 +148,8 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
{
private static final Logger logger =
LoggerFactory.getLogger(StreamSession.class);
+ public enum PrepareDirection { SEND, ACK }
+
// for test purpose to record received message and state transition
public volatile static MessageStateSink sink = MessageStateSink.NONE;
@@ -738,7 +740,7 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
for (StreamTransferTask task : transfers.values())
prepareSynAck.summaries.add(task.getSummary());
- streamResult.handleSessionPrepared(this);
+ streamResult.handleSessionPrepared(this, PrepareDirection.SEND);
// After sending the message the initiator can close the channel which
will cause a ClosedChannelException
// in buffer logic, this then gets sent to onError which validates the
state isFinalState, if not fails
// the session. To avoid a race condition between sending and setting
state, make sure to update the state
@@ -769,14 +771,14 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
if (isPreview())
completePreview();
else
- startStreamingFiles(true);
+ startStreamingFiles(PrepareDirection.ACK);
}
private void prepareAck(PrepareAckMessage msg)
{
if (isPreview())
throw new RuntimeException(String.format("[Stream #%s] Cannot
receive PrepareAckMessage for preview session", planId()));
- startStreamingFiles(true);
+ startStreamingFiles(PrepareDirection.ACK);
}
/**
@@ -845,9 +847,13 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
}
}
- public void progress(String filename, ProgressInfo.Direction direction,
long bytes, long total)
+ public void progress(String filename, ProgressInfo.Direction direction,
long bytes, long delta, long total)
{
- ProgressInfo progress = new ProgressInfo(peer, index, filename,
direction, bytes, total);
+ if (delta < 0)
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1,
TimeUnit.MINUTES,
+ "[id={}, key={{}, {}, {})] Stream event reported
a negative delta ({})",
+ planId(), peer, filename, direction, delta);
+ ProgressInfo progress = new ProgressInfo(peer, index, filename,
direction, bytes, delta, total);
streamResult.handleProgress(progress);
}
@@ -1008,10 +1014,10 @@ public class StreamSession implements
IEndpointStateChangeSubscriber
receivers.put(summary.tableId, new StreamReceiveTask(this,
summary.tableId, summary.files, summary.totalSize));
}
- private void startStreamingFiles(boolean notifyPrepared)
+ private void startStreamingFiles(@Nullable PrepareDirection
prepareDirection)
{
- if (notifyPrepared)
- streamResult.handleSessionPrepared(this);
+ if (prepareDirection != null)
+ streamResult.handleSessionPrepared(this, prepareDirection);
state(State.STREAMING);
diff --git a/src/java/org/apache/cassandra/streaming/StreamingState.java
b/src/java/org/apache/cassandra/streaming/StreamingState.java
index 23685bd0ef..c2eed1ea9e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamingState.java
+++ b/src/java/org/apache/cassandra/streaming/StreamingState.java
@@ -20,16 +20,16 @@ package org.apache.cassandra.streaming;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.net.InetSocketAddress;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import javax.annotation.concurrent.GuardedBy;
+
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,18 +54,12 @@ public class StreamingState implements StreamEventHandler
private final long createdAtMillis = Clock.Global.currentTimeMillis();
- // while streaming is running, this is a cache of StreamInfo seen with
progress state
- // the reason for the cache is that StreamSession drops data after tasks
(recieve/send) complete, this makes
- // it so that current state of a future tracks work pending rather than
work done, cache solves this by not deleting
- // when tasks complete
- // To lower memory costs, clear this after the stream completes
- private ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress = new
ConcurrentHashMap<>();
-
private final TimeUUID id;
private final boolean follower;
private final StreamOperation operation;
- private Set<InetSocketAddress> peers = null;
- private Sessions sessions = Sessions.EMPTY;
+ private final Set<InetSocketAddress> peers = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+ @GuardedBy("this")
+ private final Sessions sessions = new Sessions();
private Status status;
private String completeMessage = null;
@@ -107,13 +101,7 @@ public class StreamingState implements StreamEventHandler
public Set<InetSocketAddress> peers()
{
- Set<InetSocketAddress> peers = this.peers;
- if (peers != null)
- return peers;
- ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress =
this.streamProgress;
- if (streamProgress != null)
- return streamProgress.keySet();
- return Collections.emptySet();
+ return this.peers;
}
public Status status()
@@ -138,6 +126,7 @@ public class StreamingState implements StreamEventHandler
}
}
+ @VisibleForTesting
public StreamResultFuture future()
{
if (follower)
@@ -225,12 +214,6 @@ public class StreamingState implements StreamEventHandler
@Override
public synchronized void handleStreamEvent(StreamEvent event)
{
- ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress =
this.streamProgress;
- if (streamProgress == null)
- {
- logger.warn("Got stream event {} after the stream completed",
event.eventType);
- return;
- }
try
{
switch (event.eventType)
@@ -252,52 +235,52 @@ public class StreamingState implements StreamEventHandler
{
logger.warn("Unexpected exception handling stream event", t);
}
- sessions = Sessions.create(streamProgress.values());
lastUpdatedAtNanos = Clock.Global.nanoTime();
}
private void streamPrepared(StreamEvent.SessionPreparedEvent event)
{
- SessionInfo session = new SessionInfo(event.session);
- streamProgress.putIfAbsent(session.peer, session);
+ SessionInfo session = event.session;
+ peers.add(session.peer);
+ // only update stats on ACK to avoid duplication
+ if (event.prepareDirection != StreamSession.PrepareDirection.ACK)
+ return;
+ sessions.bytesToReceive += session.getTotalSizeToReceive();
+ sessions.bytesToSend += session.getTotalSizeToSend();
+
+ sessions.filesToReceive += session.getTotalFilesToReceive();
+ sessions.filesToSend += session.getTotalFilesToSend();
}
private void streamProgress(StreamEvent.ProgressEvent event)
{
- SessionInfo info = streamProgress.get(event.progress.peer);
- if (info != null)
+ ProgressInfo info = event.progress;
+
+ if (info.direction == ProgressInfo.Direction.IN)
{
- info.updateProgress(event.progress);
+ // receiving
+ sessions.bytesReceived += info.deltaBytes;
+ if (info.isCompleted())
+ sessions.filesReceived++;
}
else
{
- logger.warn("[Stream #{}} ID#{}] Recieved stream progress before
prepare; peer={}", id, event.progress.sessionIndex, event.progress.peer);
+ // sending
+ sessions.bytesSent += info.deltaBytes;
+ if (info.isCompleted())
+ sessions.filesSent++;
}
}
@Override
public synchronized void onSuccess(@Nullable StreamState state)
{
- ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress =
this.streamProgress;
- if (streamProgress != null)
- {
- sessions = Sessions.create(streamProgress.values());
- peers = new HashSet<>(streamProgress.keySet());
- this.streamProgress = null;
- updateState(Status.SUCCESS);
- }
+ updateState(Status.SUCCESS);
}
@Override
public synchronized void onFailure(Throwable throwable)
{
- ConcurrentMap<InetSocketAddress, SessionInfo> streamProgress =
this.streamProgress;
- if (streamProgress != null)
- {
- sessions = Sessions.create(streamProgress.values());
- peers = new HashSet<>(streamProgress.keySet());
- this.streamProgress = null;
- }
completeMessage = Throwables.getStackTraceAsString(throwable);
updateState(Status.FAILURE);
}
@@ -326,24 +309,10 @@ public class StreamingState implements StreamEventHandler
public static class Sessions
{
- public static final Sessions EMPTY = new Sessions(0, 0, 0, 0, 0, 0, 0,
0);
-
- public final long bytesToReceive, bytesReceived;
- public final long bytesToSend, bytesSent;
- public final long filesToReceive, filesReceived;
- public final long filesToSend, filesSent;
-
- public Sessions(long bytesToReceive, long bytesReceived, long
bytesToSend, long bytesSent, long filesToReceive, long filesReceived, long
filesToSend, long filesSent)
- {
- this.bytesToReceive = bytesToReceive;
- this.bytesReceived = bytesReceived;
- this.bytesToSend = bytesToSend;
- this.bytesSent = bytesSent;
- this.filesToReceive = filesToReceive;
- this.filesReceived = filesReceived;
- this.filesToSend = filesToSend;
- this.filesSent = filesSent;
- }
+ public long bytesToReceive, bytesReceived;
+ public long bytesToSend, bytesSent;
+ public long filesToReceive, filesReceived;
+ public long filesToSend, filesSent;
public static String columns()
{
@@ -357,51 +326,9 @@ public class StreamingState implements StreamEventHandler
" files_sent bigint, \n";
}
- public static Sessions create(Collection<SessionInfo> sessions)
- {
- long bytesToReceive = 0;
- long bytesReceived = 0;
- long filesToReceive = 0;
- long filesReceived = 0;
- long bytesToSend = 0;
- long bytesSent = 0;
- long filesToSend = 0;
- long filesSent = 0;
- for (SessionInfo session : sessions)
- {
- bytesToReceive += session.getTotalSizeToReceive();
- bytesReceived += session.getTotalSizeReceived();
-
- filesToReceive += session.getTotalFilesToReceive();
- filesReceived += session.getTotalFilesReceived();
-
- bytesToSend += session.getTotalSizeToSend();
- bytesSent += session.getTotalSizeSent();
-
- filesToSend += session.getTotalFilesToSend();
- filesSent += session.getTotalFilesSent();
- }
- if (0 == bytesToReceive && 0 == bytesReceived && 0 ==
filesToReceive && 0 == filesReceived && 0 == bytesToSend && 0 == bytesSent && 0
== filesToSend && 0 == filesSent)
- return EMPTY;
- return new Sessions(bytesToReceive, bytesReceived,
- bytesToSend, bytesSent,
- filesToReceive, filesReceived,
- filesToSend, filesSent);
- }
-
public boolean isEmpty()
{
- return this == EMPTY;
- }
-
- public BigDecimal receivedBytesPercent()
- {
- return div(bytesReceived, bytesToReceive);
- }
-
- public BigDecimal sentBytesPercent()
- {
- return div(bytesSent, bytesToSend);
+ return bytesToReceive == 0 && bytesToSend == 0 && filesToReceive
== 0 && filesToSend == 0;
}
public BigDecimal progress()
diff --git
a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
index a3eb7d1f34..72ab84407a 100644
---
a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
+++
b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -74,6 +74,8 @@ public class ProgressInfoCompositeData
public static CompositeData toCompositeData(TimeUUID planId, ProgressInfo
progressInfo)
{
+ // Delta is not returned as it wasn't clear the impact to backwards
compatability; it may be safe to expose.
+ // see CASSANDRA-18110
Map<String, Object> valueMap = new HashMap<>();
valueMap.put(ITEM_NAMES[0], planId.toString());
valueMap.put(ITEM_NAMES[1],
progressInfo.peer.getAddress().getHostAddress());
@@ -103,6 +105,7 @@ public class ProgressInfoCompositeData
(String) values[4],
ProgressInfo.Direction.valueOf((String)values[5]),
(long) values[6],
+ (long) values[6],
(long) values[7]);
}
catch (UnknownHostException e)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
index db896697fb..ee41ec42d7 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/RebuildStreamingTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.distributed.test.streaming;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collections;
import org.junit.Test;
@@ -29,28 +30,49 @@ import org.apache.cassandra.distributed.api.Row;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.assertj.core.api.Assertions;
import static org.assertj.core.api.Assertions.assertThat;
public class RebuildStreamingTest extends TestBaseImpl
{
+ private static final ByteBuffer BLOB = ByteBuffer.wrap(new byte[1 << 16]);
+ // zero copy streaming sends all components, so the events will include
non-Data files as well
+ private static final int NUM_COMPONENTS = 7;
+
+ @Test
+ public void zeroCopy() throws IOException
+ {
+ test(true);
+ }
+
@Test
- public void test() throws IOException
+ public void notZeroCopy() throws IOException
+ {
+ test(false);
+ }
+
+ private void test(boolean zeroCopyStreaming) throws IOException
{
try (Cluster cluster = init(Cluster.build(2)
- .withConfig(c ->
c.with(Feature.values()).set("stream_entire_sstables", false))
+ .withConfig(c ->
c.with(Feature.values())
+
.set("stream_entire_sstables",
zeroCopyStreaming).set("streaming_slow_events_log_timeout", "0s"))
.start()))
{
- cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id
varchar PRIMARY KEY);"));
+ // streaming sends events every 65k, so need to make sure that the
files are larger than this to hit
+ // all cases of the vtable
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id
varchar, spacing blob, PRIMARY KEY (user_id)) WITH compression = { 'enabled' :
false };"));
cluster.stream().forEach(i ->
i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success());
IInvokableInstance first = cluster.get(1);
IInvokableInstance second = cluster.get(2);
long expectedFiles = 10;
for (int i = 0; i < expectedFiles; i++)
{
- first.executeInternal(withKeyspace("insert into
%s.users(user_id) values (?)"), "dcapwell" + i);
+ first.executeInternal(withKeyspace("insert into
%s.users(user_id, spacing) values (?, ? )"), "dcapwell" + i, BLOB);
first.flush(KEYSPACE);
}
+ if (zeroCopyStreaming) // will include all components so need to
account for
+ expectedFiles *= NUM_COMPONENTS;
second.nodetoolResult("rebuild", "--keyspace",
KEYSPACE).asserts().success();
@@ -91,6 +113,9 @@ public class RebuildStreamingTest extends TestBaseImpl
.columnsEqualTo("files_to_receive",
"files_received").isEqualTo("files_received", expectedFiles)
.columnsEqualTo("bytes_to_receive",
"bytes_received").isEqualTo("bytes_received", totalBytes)
.columnsEqualTo("files_sent", "files_to_send",
"bytes_sent", "bytes_to_send").isEqualTo("files_sent", 0L);
+
+ // did we trigger slow event log?
+ cluster.forEach(i -> Assertions.assertThat(i.logs().grep("Handling
streaming events took longer than").getResult()).describedAs("Unable to find
slow log for node%d", i.config().num()).isNotEmpty());
}
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamingStatsDisabledTest.java
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamingStatsDisabledTest.java
new file mode 100644
index 0000000000..f3d4394d21
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/streaming/StreamingStatsDisabledTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.streaming;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.distributed.util.QueryResultUtil;
+import org.apache.cassandra.streaming.StreamManager;
+
+public class StreamingStatsDisabledTest extends TestBaseImpl
+{
+ @Test
+ public void test() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(2)
+ .withConfig(c ->
c.with(Feature.values()).set("streaming_stats_enabled", false))
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.users (user_id
varchar, PRIMARY KEY (user_id));"));
+ cluster.stream().forEach(i ->
i.nodetoolResult("disableautocompaction", KEYSPACE).asserts().success());
+
+ long expectedFiles = 10;
+ for (int i = 0; i < expectedFiles; i++)
+ {
+ cluster.get(1).executeInternal(withKeyspace("insert into
%s.users(user_id) values (?)"), "dcapwell" + i);
+ cluster.get(1).flush(KEYSPACE);
+ }
+
+ cluster.get(2).nodetoolResult("rebuild", "--keyspace",
KEYSPACE).asserts().success();
+ for (int nodeId : Arrays.asList(1, 2))
+
QueryResultUtil.assertThat(cluster.get(nodeId).executeInternalWithResult("SELECT
* FROM system_views.streaming")).isEmpty();
+
+ // trigger streaming again
+ cluster.get(1).executeInternal(withKeyspace("INSERT INTO
%s.users(user_id) VALUES ('trigger streaming')"));
+ // mimic JMX
+ cluster.get(2).runOnInstance(() ->
StreamManager.instance.setStreamingStatsEnabled(true));
+ cluster.get(2).nodetoolResult("repair",
KEYSPACE).asserts().success();
+
+
QueryResultUtil.assertThat(cluster.get(1).executeInternalWithResult("SELECT *
FROM system_views.streaming")).isEmpty();
+
QueryResultUtil.assertThat(cluster.get(2).executeInternalWithResult("SELECT *
FROM system_views.streaming")).hasSize(1);
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
index 58842bc691..a502e8c1e6 100644
---
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
+++
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
import java.util.Objects;
import java.util.function.Predicate;
+import com.google.monitoring.runtime.instrumentation.common.collect.Iterators;
import org.apache.cassandra.distributed.api.Row;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
@@ -176,5 +177,11 @@ public class QueryResultUtil
Assertions.assertThat(qr.toObjectArrays()).hasSizeGreaterThan(size);
return this;
}
+
+ public void isEmpty()
+ {
+ int size = Iterators.size(qr);
+ Assertions.assertThat(size).describedAs("QueryResult is not
empty").isEqualTo(0);
+ }
}
}
diff --git
a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
index cce995688e..c8e3d89f6b 100644
--- a/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/StreamingVirtualTableTest.java
@@ -18,9 +18,13 @@
package org.apache.cassandra.db.virtual;
import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Date;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -34,9 +38,11 @@ import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.ProgressInfo.Direction;
import org.apache.cassandra.streaming.SessionInfo;
import org.apache.cassandra.streaming.StreamCoordinator;
import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEvent.ProgressEvent;
import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.streaming.StreamResultFuture;
@@ -90,7 +96,7 @@ public class StreamingVirtualTableTest extends CQLTester
assertRows(execute(t("select id, follower, operation, peers, status,
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
new Object[] { state.id(), true, "Repair",
Collections.emptyList(), "start", 0F, new Date(state.lastUpdatedAtMillis()),
null, null });
- state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.PREPARING)));
+ state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), new SessionInfo(PEER2, 1, PEER1,
Collections.emptyList(), Collections.emptyList(),
StreamSession.State.PREPARING), StreamSession.PrepareDirection.ACK));
state.onSuccess(new StreamState(state.id(), StreamOperation.REPAIR,
ImmutableSet.of(new SessionInfo(PEER2, 1, PEER1, Collections.emptyList(),
Collections.emptyList(), StreamSession.State.COMPLETE))));
assertRows(execute(t("select id, follower, operation, peers, status,
progress_percentage, last_updated_at, failure_cause, success_message from %s")),
@@ -118,8 +124,9 @@ public class StreamingVirtualTableTest extends CQLTester
SessionInfo s1 = new SessionInfo(PEER2, 0,
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()),
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
SessionInfo s2 = new SessionInfo(PEER3, 0,
FBUtilities.getBroadcastAddressAndPort(), Arrays.asList(streamSummary()),
Arrays.asList(streamSummary(), streamSummary()), StreamSession.State.PREPARING);
- state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), s1));
- state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), s2));
+ // we only update stats on ACK
+ state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), s1,
StreamSession.PrepareDirection.ACK));
+ state.handleStreamEvent(new
StreamEvent.SessionPreparedEvent(state.id(), s2,
StreamSession.PrepareDirection.ACK));
long bytesToReceive = 0, bytesToSend = 0;
long filesToReceive = 0, filesToSend = 0;
@@ -133,31 +140,34 @@ public class StreamingVirtualTableTest extends CQLTester
assertRows(execute(t("select id, follower, peers, status,
progress_percentage, bytes_to_receive, bytes_received, bytes_to_send,
bytes_sent, files_to_receive, files_received, files_to_send, files_sent from
%s")),
new Object[] { state.id(), follower,
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", 0F, bytesToReceive,
0L, bytesToSend, 0L, filesToReceive, 0L, filesToSend, 0L });
- // update progress
+ // update progress; sent all but 1 file
long bytesReceived = 0, bytesSent = 0;
+ long filesReceived = 0, filesSent = 0;
for (SessionInfo s : Arrays.asList(s1, s2))
{
- long in = s.getTotalFilesToReceive() - 1;
- long inBytes = s.getTotalSizeToReceive() - in;
- long out = s.getTotalFilesToSend() - 1;
- long outBytes = s.getTotalSizeToSend() - out;
- state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(),
new ProgressInfo((InetAddressAndPort) s.peer, 0, "0",
ProgressInfo.Direction.IN, inBytes, inBytes)));
- state.handleStreamEvent(new StreamEvent.ProgressEvent(state.id(),
new ProgressInfo((InetAddressAndPort) s.peer, 0, "0",
ProgressInfo.Direction.OUT, outBytes, outBytes)));
- bytesReceived += inBytes;
- bytesSent += outBytes;
+ List<StreamSummary> receiving =
deterministic(s.receivingSummaries);
+ bytesReceived += progressEvent(state, s, receiving, Direction.IN);
+ filesReceived += receiving.stream().mapToInt(ss -> ss.files -
1).sum();
+
+ List<StreamSummary> sending = deterministic(s.sendingSummaries);
+ bytesSent += progressEvent(state, s, sending, Direction.OUT);
+ filesSent += sending.stream().mapToInt(ss -> ss.files - 1).sum();
}
assertRows(execute(t("select id, follower, peers, status,
bytes_to_receive, bytes_received, bytes_to_send, bytes_sent, files_to_receive,
files_received, files_to_send, files_sent from %s")),
- new Object[] { state.id(), follower,
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", bytesToReceive,
bytesReceived, bytesToSend, bytesSent, filesToReceive, 2L, filesToSend, 2L });
+ new Object[] { state.id(), follower,
Arrays.asList(PEER2.toString(), PEER3.toString()), "start", bytesToReceive,
bytesReceived, bytesToSend, bytesSent, filesToReceive, filesReceived,
filesToSend, filesSent });
// finish
for (SessionInfo s : Arrays.asList(s1, s2))
{
// complete the rest
- for (long i = 1; i < s.getTotalFilesToReceive(); i++)
- state.handleStreamEvent(new
StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort)
s.peer, 0, Long.toString(i), ProgressInfo.Direction.IN, 1, 1)));
- for (long i = 1; i < s.getTotalFilesToSend(); i++)
- state.handleStreamEvent(new
StreamEvent.ProgressEvent(state.id(), new ProgressInfo((InetAddressAndPort)
s.peer, 0, Long.toString(i), ProgressInfo.Direction.OUT, 1, 1)));
+ List<StreamSummary> receiving =
deterministic(s.receivingSummaries);
+ bytesReceived += completeEvent(state, s, receiving, Direction.IN);
+ filesReceived += receiving.stream().mapToInt(ss -> ss.files -
1).sum();
+
+ List<StreamSummary> sending = deterministic(s.sendingSummaries);
+ bytesSent += completeEvent(state, s, sending, Direction.OUT);
+ filesSent += sending.stream().mapToInt(ss -> ss.files - 1).sum();
}
assertRows(execute(t("select id, follower, peers, status,
progress_percentage, bytes_to_receive, bytes_received, bytes_to_send,
bytes_sent, files_to_receive, files_received, files_to_send, files_sent from
%s")),
@@ -168,6 +178,47 @@ public class StreamingVirtualTableTest extends CQLTester
new Object[] { state.id(), follower,
Arrays.asList(PEER2.toString(), PEER3.toString()), "success", 100F, new
Date(state.lastUpdatedAtMillis()), null, null });
}
+ private static long progressEvent(StreamingState state, SessionInfo s,
List<StreamSummary> summaries, Direction direction)
+ {
+ long counter = 0;
+ for (StreamSummary summary : summaries)
+ {
+ long fileSize = summary.totalSize / summary.files;
+ for (int i = 0; i < summary.files - 1; i++)
+ {
+ String fileName = summary.tableId + "-" +
direction.name().toLowerCase() + "-" + i;
+ state.handleStreamEvent(new ProgressEvent(state.id(), new
ProgressInfo((InetAddressAndPort) s.peer, 0, fileName, direction, fileSize,
fileSize, fileSize)));
+ counter += fileSize;
+ }
+ }
+ return counter;
+ }
+
+ private static long completeEvent(StreamingState state, SessionInfo s,
List<StreamSummary> summaries, Direction direction)
+ {
+ long counter = 0;
+ for (StreamSummary summary : summaries)
+ {
+ long fileSize = summary.totalSize / summary.files;
+ String fileName = summary.tableId + "-" +
direction.name().toLowerCase() + "-" + summary.files;
+ state.handleStreamEvent(new ProgressEvent(state.id(), new
ProgressInfo((InetAddressAndPort) s.peer, 0, fileName, direction, fileSize,
fileSize, fileSize)));
+ counter += fileSize;
+ }
+ return counter;
+ }
+
+ private List<StreamSummary> deterministic(Collection<StreamSummary>
summaries)
+ {
+ // SessionInfo uses a ImmutableSet... so create a list
+ List<StreamSummary> list = new ArrayList<>(summaries);
+ // need to order so all calls with the same input return the same order
+ // if duplicates are found, the object order may be different but the
contents will match
+ Collections.sort(list, Comparator.comparing((StreamSummary a) ->
a.tableId.asUUID())
+ .thenComparingInt(a -> a.files)
+ .thenComparingLong(a -> a.totalSize));
+ return list;
+ }
+
private static StreamSummary streamSummary()
{
int files = ThreadLocalRandom.current().nextInt(2, 10);
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
index 4f0c49468d..45172fe14c 100644
--- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -57,13 +57,13 @@ public class SessionInfoTest
assert info.getTotalFilesSent() == 0;
// receive in progress
- info.updateProgress(new ProgressInfo(local, 0, "test.txt",
ProgressInfo.Direction.IN, 50, 100));
+ info.updateProgress(new ProgressInfo(local, 0, "test.txt",
ProgressInfo.Direction.IN, 50, 50, 100));
// still in progress, but not completed yet
assert info.getTotalSizeReceived() == 50;
assert info.getTotalSizeSent() == 0;
assert info.getTotalFilesReceived() == 0;
assert info.getTotalFilesSent() == 0;
- info.updateProgress(new ProgressInfo(local, 0, "test.txt",
ProgressInfo.Direction.IN, 100, 100));
+ info.updateProgress(new ProgressInfo(local, 0, "test.txt",
ProgressInfo.Direction.IN, 100, 100, 100));
// 1 file should be completed
assert info.getTotalSizeReceived() == 100;
assert info.getTotalSizeSent() == 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]