This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new b936d3e Show the progress of data streaming and index build
b936d3e is described below
commit b936d3e95db3180a2f1f32ed7c80543e81971fde
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Fri Sep 18 11:40:19 2020 -0700
Show the progress of data streaming and index build
patch by Stefan Miklosovic; reviewed by Benjamin Lerer, Berenguer Blasi,
David Capwell for CASSANDRA-15406
---
CHANGES.txt | 1 +
.../streaming/CassandraCompressedStreamReader.java | 2 +-
.../db/streaming/CassandraStreamReader.java | 2 +-
.../apache/cassandra/streaming/ProgressInfo.java | 2 +-
.../cassandra/streaming/StreamReceiveTask.java | 4 +-
.../apache/cassandra/tools/nodetool/NetStats.java | 64 ++-
.../cassandra/distributed/impl/Instance.java | 14 +-
.../shared/NodeToolResultWithOutput.java | 48 ++
.../test/AbstractNetstatsBootstrapStreaming.java | 85 ++++
.../test/AbstractNetstatsStreaming.java | 548 +++++++++++++++++++++
...WithEntireSSTablesCompressionStreamingTest.java | 36 ++
...houtEntireSSTablesCompressionStreamingTest.java | 36 ++
.../test/NetstatsRepairStreamingTest.java | 88 ++++
.../cassandra/distributed/util/NodetoolUtils.java | 68 +++
14 files changed, 978 insertions(+), 20 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 4916faa..42b7aa7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
* Make Table/Keyspace Metric Names Consistent With Each Other
(CASSANDRA-15909)
* Mutating sstable component may race with entire-sstable-streaming(ZCS)
causing checksum validation failure (CASSANDRA-15861)
* NPE thrown while updating speculative execution time if keyspace is removed
during task execution (CASSANDRA-15949)
+ * Show the progress of data streaming and index build (CASSANDRA-15406)
Merged from 3.11:
* Use IF NOT EXISTS for index and UDT create statements in snapshot schema
files (CASSANDRA-13935)
* Make sure LCS handles duplicate sstable added/removed notifications
correctly (CASSANDRA-14103)
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
index 2491fe1..ff9e6f7 100644
---
a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
+++
b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java
@@ -97,7 +97,7 @@ public class CassandraCompressedStreamReader extends
CassandraStreamReader
{
writePartition(deserializer, writer);
// when compressed, report total bytes of compressed
chunks read since remoteFile.size is the sum of chunks transferred
- session.progress(filename, ProgressInfo.Direction.IN,
cis.chunkBytesRead(), totalSize);
+ session.progress(filename + '-' + fileSeqNum,
ProgressInfo.Direction.IN, cis.chunkBytesRead(), totalSize);
}
assert in.getBytesRead() == sectionLength;
}
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 686d874..6835fad 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -124,7 +124,7 @@ public class CassandraStreamReader implements IStreamReader
{
writePartition(deserializer, writer);
// TODO move this to BytesReadTracker
- session.progress(writer.getFilename(),
ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
+ session.progress(writer.getFilename() + '-' + fileSeqNum,
ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
}
logger.debug("[Stream #{}] Finished receiving file #{} from {}
readBytes = {}, totalSize = {}",
session.planId(), fileSeqNum, session.peer,
FBUtilities.prettyPrintMemory(in.getBytesRead()),
FBUtilities.prettyPrintMemory(totalSize));
diff --git a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
index ac91855..2b306f8 100644
--- a/src/java/org/apache/cassandra/streaming/ProgressInfo.java
+++ b/src/java/org/apache/cassandra/streaming/ProgressInfo.java
@@ -110,7 +110,7 @@ public class ProgressInfo implements Serializable
{
StringBuilder sb = new StringBuilder(fileName);
sb.append(" ").append(currentBytes);
- sb.append("/").append(totalBytes).append(" bytes");
+ sb.append("/").append(totalBytes).append(" bytes ");
sb.append("(").append(currentBytes*100/totalBytes).append("%) ");
sb.append(direction == Direction.OUT ? "sent to " : "received from ");
sb.append("idx:").append(sessionIndex);
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 25977a5..d127edb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -85,8 +85,8 @@ public class StreamReceiveTask extends StreamTask
remoteStreamsReceived += stream.getNumFiles();
bytesReceived += stream.getSize();
Preconditions.checkArgument(tableId.equals(stream.getTableId()));
- logger.debug("received {} of {} total files {} of total bytes {}",
remoteStreamsReceived, totalStreams,
- bytesReceived, totalSize);
+ logger.debug("received {} of {} total files, {} of total bytes {}",
remoteStreamsReceived, totalStreams,
+ bytesReceived, stream.getSize());
receiver.received(stream);
diff --git a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
index c0500ca..e86505b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/NetStats.java
@@ -59,10 +59,29 @@ public class NetStats extends NodeToolCmd
System.out.printf("%n");
if (!info.receivingSummaries.isEmpty())
{
+ long totalFilesToReceive = info.getTotalFilesToReceive();
+ long totalBytesToReceive = info.getTotalSizeToReceive();
+ long totalFilesReceived = info.getTotalFilesReceived();
+ long totalSizeReceived = info.getTotalSizeReceived();
+ double percentageFilesReceived = ((double)
totalFilesReceived / totalFilesToReceive) * 100;
+ double percentageSizesReceived = ((double)
totalSizeReceived / totalBytesToReceive) * 100;
+
if (humanReadable)
- System.out.printf(" Receiving %d files, %s
total. Already received %d files, %s total%n", info.getTotalFilesToReceive(),
FileUtils.stringifyFileSize(info.getTotalSizeToReceive()),
info.getTotalFilesReceived(),
FileUtils.stringifyFileSize(info.getTotalSizeReceived()));
+ System.out.printf(" Receiving %d files, %s
total. Already received %d files (%.2f%%), %s total (%.2f%%)%n",
+ totalFilesToReceive,
+
FileUtils.stringifyFileSize(totalBytesToReceive),
+ totalFilesReceived,
+ percentageFilesReceived,
+
FileUtils.stringifyFileSize(totalSizeReceived),
+ percentageSizesReceived);
else
- System.out.printf(" Receiving %d files, %d
bytes total. Already received %d files, %d bytes total%n",
info.getTotalFilesToReceive(), info.getTotalSizeToReceive(),
info.getTotalFilesReceived(), info.getTotalSizeReceived());
+ System.out.printf(" Receiving %d files, %d
bytes total. Already received %d files (%.2f%%), %d bytes total (%.2f%%)%n",
+ totalFilesToReceive,
+ totalBytesToReceive,
+ totalFilesReceived,
+ percentageFilesReceived,
+ totalSizeReceived,
+ percentageSizesReceived);
for (ProgressInfo progress : info.getReceivingFiles())
{
System.out.printf(" %s%n",
progress.toString(printPort));
@@ -70,10 +89,29 @@ public class NetStats extends NodeToolCmd
}
if (!info.sendingSummaries.isEmpty())
{
+ long totalFilesToSend = info.getTotalFilesToSend();
+ long totalSizeToSend = info.getTotalSizeToSend();
+ long totalFilesSent = info.getTotalFilesSent();
+ long totalSizeSent = info.getTotalSizeSent();
+ double percentageFilesSent = ((double) totalFilesSent /
totalFilesToSend) * 100;
+ double percentageSizeSent = ((double) totalSizeSent /
totalSizeToSend) * 100;
+
if (humanReadable)
- System.out.printf(" Sending %d files, %s total.
Already sent %d files, %s total%n", info.getTotalFilesToSend(),
FileUtils.stringifyFileSize(info.getTotalSizeToSend()),
info.getTotalFilesSent(), FileUtils.stringifyFileSize(info.getTotalSizeSent()));
+ System.out.printf(" Sending %d files, %s total.
Already sent %d files (%.2f%%), %s total (%.2f%%)%n",
+ totalFilesToSend,
+
FileUtils.stringifyFileSize(totalSizeToSend),
+ totalFilesSent,
+ percentageFilesSent,
+
FileUtils.stringifyFileSize(totalSizeSent),
+ percentageSizeSent);
else
- System.out.printf(" Sending %d files, %d bytes
total. Already sent %d files, %d bytes total%n", info.getTotalFilesToSend(),
info.getTotalSizeToSend(), info.getTotalFilesSent(), info.getTotalSizeSent());
+ System.out.printf(" Sending %d files, %d bytes
total. Already sent %d files (%.2f%%), %d bytes total (%.2f%%) %n",
+ totalFilesToSend,
+ totalSizeToSend,
+ totalFilesSent,
+ percentageFilesSent,
+ totalSizeSent,
+ percentageSizeSent);
for (ProgressInfo progress : info.getSendingFiles())
{
System.out.printf(" %s%n",
progress.toString(printPort));
@@ -98,35 +136,35 @@ public class NetStats extends NodeToolCmd
long dropped;
pending = 0;
- for (int n : ms.getLargeMessagePendingTasks().values())
+ for (int n : ms.getLargeMessagePendingTasksWithPort().values())
pending += n;
completed = 0;
- for (long n : ms.getLargeMessageCompletedTasks().values())
+ for (long n : ms.getLargeMessageCompletedTasksWithPort().values())
completed += n;
dropped = 0;
- for (long n : ms.getLargeMessageDroppedTasks().values())
+ for (long n : ms.getLargeMessageDroppedTasksWithPort().values())
dropped += n;
System.out.printf("%-25s%10s%10s%15s%10s%n", "Large messages",
"n/a", pending, completed, dropped);
pending = 0;
- for (int n : ms.getSmallMessagePendingTasks().values())
+ for (int n : ms.getSmallMessagePendingTasksWithPort().values())
pending += n;
completed = 0;
- for (long n : ms.getSmallMessageCompletedTasks().values())
+ for (long n : ms.getSmallMessageCompletedTasksWithPort().values())
completed += n;
dropped = 0;
- for (long n : ms.getSmallMessageDroppedTasks().values())
+ for (long n : ms.getSmallMessageDroppedTasksWithPort().values())
dropped += n;
System.out.printf("%-25s%10s%10s%15s%10s%n", "Small messages",
"n/a", pending, completed, dropped);
pending = 0;
- for (int n : ms.getGossipMessagePendingTasks().values())
+ for (int n : ms.getGossipMessagePendingTasksWithPort().values())
pending += n;
completed = 0;
- for (long n : ms.getGossipMessageCompletedTasks().values())
+ for (long n : ms.getGossipMessageCompletedTasksWithPort().values())
completed += n;
dropped = 0;
- for (long n : ms.getGossipMessageDroppedTasks().values())
+ for (long n : ms.getGossipMessageDroppedTasksWithPort().values())
dropped += n;
System.out.printf("%-25s%10s%10s%15s%10s%n", "Gossip messages",
"n/a", pending, completed, dropped);
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 5395cb8..038698b 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -605,18 +605,28 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
}).call();
}
- private static class DTestNodeTool extends NodeTool {
+ public static class DTestNodeTool extends NodeTool {
private final StorageServiceMBean storageProxy;
private final CollectingNotificationListener notifications = new
CollectingNotificationListener();
private Throwable latestError;
- DTestNodeTool(boolean withNotifications) {
+ public DTestNodeTool(boolean withNotifications) {
super(new InternalNodeProbeFactory(withNotifications));
storageProxy = new
InternalNodeProbe(withNotifications).getStorageService();
storageProxy.addNotificationListener(notifications, null, null);
}
+ public List<Notification> getNotifications()
+ {
+ return new ArrayList<>(notifications.notifications);
+ }
+
+ public Throwable getLatestError()
+ {
+ return latestError;
+ }
+
public int execute(String... args)
{
try
diff --git
a/test/distributed/org/apache/cassandra/distributed/shared/NodeToolResultWithOutput.java
b/test/distributed/org/apache/cassandra/distributed/shared/NodeToolResultWithOutput.java
new file mode 100644
index 0000000..cb94887
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/shared/NodeToolResultWithOutput.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.cassandra.distributed.api.NodeToolResult;
+
+public class NodeToolResultWithOutput
+{
+ private final NodeToolResult result;
+ private final ByteArrayOutputStream stdout;
+ private final ByteArrayOutputStream stderr;
+
+ public NodeToolResultWithOutput(NodeToolResult result,
ByteArrayOutputStream stdout, ByteArrayOutputStream stderr) {
+ this.result = result;
+ this.stdout = stdout;
+ this.stderr = stderr;
+ }
+
+ public NodeToolResult getResult() {
+ return this.result;
+ }
+
+ public String getStdout() {
+ return this.stdout.toString();
+ }
+
+ public String getStderr() {
+ return this.stderr.toString();
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
new file mode 100644
index 0000000..7aca7bd
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsBootstrapStreaming.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public abstract class AbstractNetstatsBootstrapStreaming extends
AbstractNetstatsStreaming
+{
+ protected void executeTest(final boolean streamEntireSSTables,
+ final boolean compressionEnabled) throws
Exception
+ {
+ final Cluster.Builder builder = builder().withNodes(1)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(2))
+
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0"))
+ .withConfig(config ->
config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL)
+
.set("stream_throughput_outbound_megabits_per_sec", 1)
+
.set("compaction_throughput_mb_per_sec", 1)
+
.set("stream_entire_sstables", streamEntireSSTables));
+
+ try (final Cluster cluster = builder.withNodes(1).start())
+ {
+ // populate data only against 1 node first
+
+ createTable(cluster, 1, compressionEnabled);
+
+ cluster.get(1).nodetoolResult("disableautocompaction",
"netstats_test").asserts().success();
+
+ if (compressionEnabled)
+ {
+ populateData(true);
+ }
+ else
+ {
+ populateData(false);
+ }
+
+ cluster.get(1).flush("netstats_test");
+
+ // then bootstrap the second one, upon joining,
+ // we should see that netstats shows how SSTables are being
streamed on the first node
+
+ final IInstanceConfig config = cluster.newInstanceConfig();
+ config.set("auto_bootstrap", true);
+
+ IInvokableInstance secondNode = cluster.bootstrap(config);
+
+ final Future<?> startupRunnable =
executorService.submit((Runnable) secondNode::startup);
+ final Future<AbstractNetstatsStreaming.NetstatResults>
netstatsFuture = executorService.submit(new NetstatsCallable(cluster.get(1)));
+
+ final AbstractNetstatsStreaming.NetstatResults results =
netstatsFuture.get(1, MINUTES);
+ startupRunnable.get(2, MINUTES);
+
+ results.assertSuccessful();
+
+
AbstractNetstatsStreaming.NetstatsOutputParser.validate(AbstractNetstatsStreaming.NetstatsOutputParser.parse(results));
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsStreaming.java
b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsStreaming.java
new file mode 100644
index 0000000..85e3e23
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/AbstractNetstatsStreaming.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NodeToolResultWithOutput;
+import org.apache.cassandra.distributed.util.NodetoolUtils;
+import org.apache.cassandra.utils.Pair;
+
+import static java.util.stream.Collectors.toList;
+
+public abstract class AbstractNetstatsStreaming extends TestBaseImpl
+{
+ protected static final Logger logger =
LoggerFactory.getLogger(AbstractNetstatsStreaming.class);
+
+ protected ExecutorService executorService;
+
+ @Before
+ public void setup()
+ {
+ executorService = Executors.newCachedThreadPool();
+ }
+
+ @After
+ public void teardown() throws Exception
+ {
+ try
+ {
+ executorService.shutdownNow();
+
+ if (!executorService.isShutdown())
+ {
+ if (!executorService.awaitTermination(1, TimeUnit.MINUTES))
+ {
+ throw new IllegalStateException("Unable to shutdown
executor for invoking netstat commands.");
+ }
+ }
+ }
+ finally
+ {
+ executorService = null;
+ }
+ }
+
+ protected void changeReplicationFactor()
+ {
+ try (com.datastax.driver.core.Cluster c =
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+ Session s = c.connect())
+ {
+ s.execute("ALTER KEYSPACE netstats_test WITH replication =
{'class': 'SimpleStrategy', 'replication_factor': 2 };");
+ }
+ }
+
+ protected void createTable(Cluster cluster, int replicationFactor, boolean
compressionEnabled)
+ {
+ // replication factor is 1
+ cluster.schemaChange("CREATE KEYSPACE netstats_test WITH replication =
{'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
+
+ if (compressionEnabled)
+ {
+ cluster.schemaChange("CREATE TABLE netstats_test.test_table (id
uuid primary key) WITH compression = {'enabled':'true', 'class':
'LZ4Compressor'};");
+ }
+ else
+ {
+ cluster.schemaChange("CREATE TABLE netstats_test.test_table (id
uuid primary key) WITH compression = {'enabled':'false'};");
+ }
+ }
+
+ protected void populateData(boolean forCompressedTest)
+ {
+ try (com.datastax.driver.core.Cluster c =
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
+ Session s = c.connect("netstats_test"))
+ {
+ int records = forCompressedTest ? 100_000 : 70_000;
+
+ for (int i = 0; i < records; i++)
+ {
+ s.execute("INSERT INTO test_table (id) VALUES (" +
UUID.randomUUID() + ")");
+ }
+ }
+ }
+
+ protected static class NetstatsOutputParser
+ {
+ public static List<Pair<ReceivingStastistics, SendingStatistics>>
parse(final NetstatResults results)
+ {
+ final Set<String> outputs = new LinkedHashSet<>();
+
+ results.netstatOutputs.stream()
+ .map(NodeToolResultWithOutput::getStdout)
+ .filter(output -> !output.contains("Not
sending any streams"))
+ .filter(output ->
output.contains("Receiving") || output.contains("Sending"))
+ .forEach(outputs::add);
+
+ final List<Pair<ReceivingStastistics, SendingStatistics>> parsed =
new ArrayList<>();
+
+ for (final String output : outputs)
+ {
+ boolean processingReceiving = false;
+ boolean processingSending = false;
+
+ final ReceivingStastistics receivingStastistics = new
ReceivingStastistics();
+ final SendingStatistics sendingStatistics = new
SendingStatistics();
+
+ final List<String> sanitisedOutput =
Stream.of(output.split("\n"))
+ .map(String::trim)
+ .filter(line ->
!line.isEmpty())
+ // sometimes logs
are mangled into output
+ .filter(line ->
Stream.of("DEBUG", "INFO", "ERROR", "WARN").noneMatch(line::contains))
+ .filter(line ->
Stream.of("Mode:", "Read", "Attempted", "Mismatch", "Pool", "Large", "Small",
"Gossip").noneMatch(line::startsWith))
+ .collect(toList());
+
+ for (final String outputLine : sanitisedOutput)
+ {
+ if (outputLine.startsWith("Receiving"))
+ {
+ processingReceiving = true;
+ processingSending = false;
+
+ receivingStastistics.parseHeader(outputLine);
+ }
+ else if (outputLine.startsWith("Sending"))
+ {
+ processingSending = true;
+ processingReceiving = false;
+
+ sendingStatistics.parseHeader(outputLine);
+ }
+ else if (processingReceiving)
+ {
+ receivingStastistics.parseTable(outputLine);
+ }
+ else if (processingSending)
+ {
+ sendingStatistics.parseTable(outputLine);
+ }
+ }
+
+ parsed.add(Pair.create(receivingStastistics,
sendingStatistics));
+ }
+
+ return parsed;
+ }
+
+ public static void validate(List<Pair<ReceivingStastistics,
SendingStatistics>> result)
+ {
+ List<SendingStatistics> sendingStatistics =
result.stream().map(pair -> pair.right).collect(toList());
+
+ if (sendingStatistics.size() >= 2)
+ {
+ for (int i = 0; i < sendingStatistics.size() - 1; i++)
+ {
+ SendingStatistics.SendingHeader header1 =
sendingStatistics.get(i).sendingHeader;
+ SendingStatistics.SendingHeader header2 =
sendingStatistics.get(i + 1).sendingHeader;
+
+ if (header1 != null && header2 != null)
+ {
+ Assert.assertTrue(header1.compareTo(header2) <= 0);
+ }
+ }
+ }
+
+ for (SendingStatistics sending : sendingStatistics)
+ {
+ if (sending.sendingHeader != null)
+ {
+ Assert.assertEquals(sending.sendingHeader.bytesTotalSoFar,
(long) sending.sendingSSTable.stream().map(table ->
table.bytesSent).reduce(Long::sum).orElseGet(() -> 0L));
+ Assert.assertTrue(sending.sendingHeader.bytesTotal >=
sending.sendingSSTable.stream().map(table ->
table.bytesInTotal).reduce(Long::sum).orElseGet(() -> 0L));
+
+ if (sending.sendingHeader.bytesTotalSoFar != 0)
+ {
+ double progress = (double)
sending.sendingSSTable.stream().map(table ->
table.bytesSent).reduce(Long::sum).orElse(0L) / (double)
sending.sendingHeader.bytesTotal;
+
+ Assert.assertTrue((int)
sending.sendingHeader.progressBytes >= (int) (progress * 100));
+
+ Assert.assertTrue((double)
sending.sendingHeader.bytesTotal >= (double)
sending.sendingSSTable.stream().map(table ->
table.bytesInTotal).reduce(Long::sum).orElse(0L));
+ }
+ }
+ }
+
+ List<ReceivingStastistics> receivingStastistics =
result.stream().map(pair -> pair.left).collect(toList());
+
+ for (ReceivingStastistics receiving : receivingStastistics)
+ {
+ if (receiving.receivingHeader != null)
+ {
+ Assert.assertTrue(receiving.receivingHeader.bytesTotal >=
receiving.receivingTables.stream().map(table ->
table.receivedSoFar).reduce(Long::sum).orElse(0L));
+
Assert.assertEquals(receiving.receivingHeader.bytesTotalSoFar, (long)
receiving.receivingTables.stream().map(table ->
table.receivedSoFar).reduce(Long::sum).orElse(0L));
+ }
+ }
+ }
+
+ public static class ReceivingStastistics
+ {
+ public ReceivingHeader receivingHeader;
+ public List<ReceivingTable> receivingTables = new ArrayList<>();
+
+ public void parseHeader(String header)
+ {
+ receivingHeader = ReceivingHeader.parseHeader(header);
+ }
+
+ public void parseTable(String table)
+ {
+ receivingTables.add(ReceivingTable.parseTable(table));
+ }
+
+ public String toString()
+ {
+ return "ReceivingStastistics{" +
+ "receivingHeader=" + receivingHeader +
+ ", receivingTables=" + receivingTables +
+ '}';
+ }
+
+ public static class ReceivingHeader
+ {
+ private static final Pattern receivingHeaderPattern =
Pattern.compile(
+ "Receiving (.*) files, (.*) bytes total. Already received (.*)
files \\((.*)%\\), (.*) bytes total \\((.*)%\\)"
+ );
+
+ int totalReceiving = 0;
+ long bytesTotal = 0;
+ int alreadyReceived = 0;
+ double progressFiles = 0.0f;
+ long bytesTotalSoFar = 0;
+ double progressBytes = 0.0f;
+
+ public static ReceivingHeader parseHeader(String header)
+ {
+ final Matcher matcher =
receivingHeaderPattern.matcher(header);
+
+ if (matcher.matches())
+ {
+ final ReceivingHeader receivingHeader = new
ReceivingHeader();
+
+ receivingHeader.totalReceiving =
Integer.parseInt(matcher.group(1));
+ receivingHeader.bytesTotal =
Long.parseLong(matcher.group(2));
+ receivingHeader.alreadyReceived =
Integer.parseInt(matcher.group(3));
+ receivingHeader.progressFiles =
Double.parseDouble(matcher.group(4));
+ receivingHeader.bytesTotalSoFar =
Long.parseLong(matcher.group(5));
+ receivingHeader.progressBytes =
Double.parseDouble(matcher.group(6));
+
+ return receivingHeader;
+ }
+
+ throw new IllegalStateException("Header does not match - "
+ header);
+ }
+
+ public String toString()
+ {
+ return "ReceivingHeader{" +
+ "totalReceiving=" + totalReceiving +
+ ", bytesTotal=" + bytesTotal +
+ ", alreadyReceived=" + alreadyReceived +
+ ", progressFiles=" + progressFiles +
+ ", bytesTotalSoFar=" + bytesTotalSoFar +
+ ", progressBytes=" + progressBytes +
+ '}';
+ }
+ }
+
+ public static class ReceivingTable
+ {
+ long receivedSoFar = 0;
+ long toReceive = 0;
+ double progress = 0.0;
+
+ private static final Pattern recievingFilePattern =
Pattern.compile("(.*) (.*)/(.*) bytes \\((.*)%\\) received from (.*)");
+
+ public static ReceivingTable parseTable(String table)
+ {
+ final Matcher matcher =
recievingFilePattern.matcher(table);
+
+ if (matcher.matches())
+ {
+ final ReceivingTable receivingTable = new
ReceivingTable();
+
+ receivingTable.receivedSoFar =
Long.parseLong(matcher.group(2));
+ receivingTable.toReceive =
Long.parseLong(matcher.group(3));
+ receivingTable.progress =
Double.parseDouble(matcher.group(4));
+
+ return receivingTable;
+ }
+
+ throw new IllegalStateException("Table line does not match
- " + table);
+ }
+
+ public String toString()
+ {
+ return "ReceivingTable{" +
+ "receivedSoFar=" + receivedSoFar +
+ ", toReceive=" + toReceive +
+ ", progress=" + progress +
+ '}';
+ }
+ }
+ }
+
+ public static class SendingStatistics
+ {
+ public SendingHeader sendingHeader;
+ public List<SendingSSTable> sendingSSTable = new ArrayList<>();
+
+ public void parseHeader(String outputLine)
+ {
+ this.sendingHeader = SendingHeader.parseHeader(outputLine);
+ }
+
+ public void parseTable(String table)
+ {
+ sendingSSTable.add(SendingSSTable.parseTable(table));
+ }
+
+ public String toString()
+ {
+ return "SendingStatistics{" +
+ "sendingHeader=" + sendingHeader +
+ ", sendingSSTable=" + sendingSSTable +
+ '}';
+ }
+
+ public static class SendingHeader implements
Comparable<SendingHeader>
+ {
+ private static final Pattern sendingHeaderPattern =
Pattern.compile(
+ "Sending (.*) files, (.*) bytes total. Already sent (.*) files
\\((.*)%\\), (.*) bytes total \\((.*)%\\)"
+ );
+
+ int totalSending = 0;
+ long bytesTotal = 0;
+ int alreadySent = 0;
+ double progressFiles = 0.0f;
+ long bytesTotalSoFar = 0;
+ double progressBytes = 0.0f;
+
+ public static SendingHeader parseHeader(String header)
+ {
+ final Matcher matcher =
sendingHeaderPattern.matcher(header);
+
+ if (matcher.matches())
+ {
+ final SendingHeader sendingHeader = new
SendingHeader();
+
+ sendingHeader.totalSending =
Integer.parseInt(matcher.group(1));
+ sendingHeader.bytesTotal =
Long.parseLong(matcher.group(2));
+ sendingHeader.alreadySent =
Integer.parseInt(matcher.group(3));
+ sendingHeader.progressFiles =
Double.parseDouble(matcher.group(4));
+ sendingHeader.bytesTotalSoFar =
Long.parseLong(matcher.group(5));
+ sendingHeader.progressBytes =
Double.parseDouble(matcher.group(6));
+
+ return sendingHeader;
+ }
+
+ throw new IllegalStateException("Header does not match - "
+ header);
+ }
+
+ public String toString()
+ {
+ return "SendingHeader{" +
+ "totalSending=" + totalSending +
+ ", bytesTotal=" + bytesTotal +
+ ", alreadySent=" + alreadySent +
+ ", progressFiles=" + progressFiles +
+ ", bytesTotalSoFar=" + bytesTotalSoFar +
+ ", progressBytes=" + progressBytes +
+ '}';
+ }
+
+
+ public int compareTo(SendingHeader o)
+ {
+ // progress on bytes has to be strictly lower,
+ // even alreadySent and progressFiles and progressBytes
are same,
+ // bytesTotalSoFar has to be lower, bigger or same
+
+ if (alreadySent <= o.alreadySent
+ && progressFiles <= o.progressFiles
+ && bytesTotalSoFar <= o.bytesTotalSoFar
+ && progressBytes <= o.progressBytes)
+ {
+ return -1;
+ }
+ else if (alreadySent == o.alreadySent
+ && progressFiles == o.progressFiles
+ && bytesTotalSoFar == o.bytesTotalSoFar
+ && progressBytes == o.progressBytes)
+ {
+ return 0;
+ }
+ else if (alreadySent >= o.alreadySent
+ && progressFiles >= o.progressFiles
+ && bytesTotalSoFar > o.bytesTotalSoFar
+ && progressBytes >= o.progressBytes)
+ {
+ return 1;
+ }
+ else
+ {
+ throw new IllegalStateException(String.format("Could
not compare arguments %s and %s", this, o));
+ }
+ }
+ }
+
+ public static class SendingSSTable
+ {
+ private static final Pattern sendingFilePattern =
Pattern.compile("(.*) (.*)/(.*) bytes \\((.*)%\\) sent to (.*)");
+
+ long bytesSent = 0;
+ long bytesInTotal = 0;
+ double progress = 0.0f;
+
+ public static SendingSSTable parseTable(String table)
+ {
+ final Matcher matcher = sendingFilePattern.matcher(table);
+
+ if (matcher.matches())
+ {
+ final SendingSSTable sendingSSTable = new
SendingSSTable();
+
+ sendingSSTable.bytesSent =
Long.parseLong(matcher.group(2));
+ sendingSSTable.bytesInTotal =
Long.parseLong(matcher.group(3));
+ sendingSSTable.progress =
Double.parseDouble(matcher.group(4));
+
+ return sendingSSTable;
+ }
+
+ throw new IllegalStateException("Table does not match - "
+ table);
+ }
+
+ public String toString()
+ {
+ return "SendingSSTable{" +
+ "bytesSent=" + bytesSent +
+ ", bytesInTotal=" + bytesInTotal +
+ ", progress=" + progress +
+ '}';
+ }
+ }
+ }
+ }
+
+ protected static final class NetstatResults
+ {
+ private final List<NodeToolResultWithOutput> netstatOutputs = new
ArrayList<>();
+
+ public void add(NodeToolResultWithOutput result)
+ {
+ netstatOutputs.add(result);
+ }
+
+ public void assertSuccessful()
+ {
+ for (final NodeToolResultWithOutput result : netstatOutputs)
+ {
+ Assert.assertEquals(result.getResult().getRc(), 0);
+ Assert.assertTrue(result.getStderr().isEmpty());
+ }
+ }
+ }
+
+ protected static class NetstatsCallable implements Callable<NetstatResults>
+ {
+ private final IInvokableInstance node;
+
+ public NetstatsCallable(final IInvokableInstance node)
+ {
+ this.node = node;
+ }
+
+ public NetstatResults call() throws Exception
+ {
+ final NetstatResults results = new NetstatResults();
+
+ boolean sawAnyStreamingOutput = false;
+
+ while (true)
+ {
+ try
+ {
+ final NodeToolResultWithOutput result =
NodetoolUtils.nodetool(node, false, "netstats");
+
+ logger.info(node.broadcastAddress().toString() + " " +
result.getStdout());
+
+ if (!sawAnyStreamingOutput)
+ {
+ if (result.getStdout().contains("Receiving") ||
result.getStdout().contains("Sending"))
+ {
+ sawAnyStreamingOutput = true;
+ }
+ }
+
+ if (sawAnyStreamingOutput &&
(!result.getStdout().contains("Receiving") &&
!result.getStdout().contains("Sending")))
+ {
+ break;
+ }
+
+ results.add(result);
+
+ Thread.currentThread().sleep(500);
+ }
+ catch (final Exception ex)
+ {
+ System.out.println(ex.getMessage());
+ Thread.currentThread().sleep(500);
+ }
+ }
+
+ return results;
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
new file mode 100644
index 0000000..7c53426
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+public class NetstatsBootstrapWithEntireSSTablesCompressionStreamingTest
extends AbstractNetstatsBootstrapStreaming
+{
+ @Test
+ public void testWithStreamingEntireSSTablesWithCompression() throws
Exception
+ {
+ executeTest(true, true);
+ }
+
+ @Test
+ public void testWithStreamingEntireSSTablesWithoutCompression() throws
Exception
+ {
+ executeTest(true, false);
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest.java
new file mode 100644
index 0000000..68b16c2
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+public class NetstatsBootstrapWithoutEntireSSTablesCompressionStreamingTest
extends AbstractNetstatsBootstrapStreaming
+{
+ @Test
+ public void testWithoutStreamingEntireSSTablesWithCompression() throws
Exception
+ {
+ executeTest(false, true);
+ }
+
+ @Test
+ public void testWithoutStreamingEntireSSTablesWithoutCompression() throws
Exception
+ {
+ executeTest(false, false);
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java
new file mode 100644
index 0000000..5f74c77
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/NetstatsRepairStreamingTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class NetstatsRepairStreamingTest extends AbstractNetstatsStreaming
+{
+ @Test
+ public void testWithCompressionEnabled() throws Exception
+ {
+ executeTest(true);
+ }
+
+ @Test
+ public void testWithCompressionDisabled() throws Exception
+ {
+ executeTest(false);
+ }
+
+ private void executeTest(boolean compressionEnabled) throws Exception
+ {
+ final ExecutorService executorService =
Executors.newFixedThreadPool(1);
+
+ try (final Cluster cluster = Cluster.build()
+
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0"))
+ .withConfig(config ->
config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL)
+
.set("stream_throughput_outbound_megabits_per_sec", 1)
+
.set("compaction_throughput_mb_per_sec", 1)
+
.set("stream_entire_sstables", false)).start())
+ {
+ final IInvokableInstance node1 = cluster.get(1);
+ final IInvokableInstance node2 = cluster.get(2);
+
+ createTable(cluster, 1, compressionEnabled);
+
+ node1.nodetoolResult("disableautocompaction",
"netstats_test").asserts().success();
+ node2.nodetoolResult("disableautocompaction",
"netstats_test").asserts().success();
+
+ populateData(compressionEnabled);
+
+ node1.flush("netstats_test");
+ node2.flush("netstats_test");
+
+ //change RF from 1 to 2 so we need to repair it, repairing will
causes streaming shown in netstats
+ changeReplicationFactor();
+
+ final Future<NetstatResults> resultsFuture1 =
executorService.submit(new NetstatsCallable(node1));
+
+ node1.nodetoolResult("repair",
"netstats_test").asserts().success();
+
+ final NetstatResults results = resultsFuture1.get(1, MINUTES);
+
+ results.assertSuccessful();
+
+ NetstatsOutputParser.validate(NetstatsOutputParser.parse(results));
+ }
+ }
+}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/distributed/util/NodetoolUtils.java
b/test/distributed/org/apache/cassandra/distributed/util/NodetoolUtils.java
new file mode 100644
index 0000000..1bb6adf
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/util/NodetoolUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.impl.Instance;
+import org.apache.cassandra.distributed.shared.NodeToolResultWithOutput;
+
+public final class NodetoolUtils
+{
+ private NodetoolUtils()
+ {
+
+ }
+
+ public static NodeToolResultWithOutput nodetool(IInvokableInstance inst,
String... args)
+ {
+ return nodetool(inst, true, args);
+ }
+
+ public static NodeToolResultWithOutput nodetool(IInvokableInstance inst,
boolean withNotifications, String... args)
+ {
+ return inst.callOnInstance(() -> {
+ PrintStream originalSysOut = System.out;
+ PrintStream originalSysErr = System.err;
+ originalSysOut.flush();
+ originalSysErr.flush();
+ ByteArrayOutputStream toolOut = new ByteArrayOutputStream();
+ ByteArrayOutputStream toolErr = new ByteArrayOutputStream();
+
+ try (PrintStream newOut = new PrintStream(toolOut);
+ PrintStream newErr = new PrintStream(toolErr))
+ {
+ System.setOut(newOut);
+ System.setErr(newErr);
+ Instance.DTestNodeTool nodetool = new
Instance.DTestNodeTool(withNotifications);
+ int rc = nodetool.execute(args);
+ NodeToolResult result = new NodeToolResult(args, rc,
nodetool.getNotifications(), nodetool.getLatestError());
+ return new NodeToolResultWithOutput(result, toolOut, toolErr);
+ }
+ finally
+ {
+ System.setOut(originalSysOut);
+ System.setErr(originalSysErr);
+ }
+ });
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]