This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 26ff589f3d Expose current compaction throughput in nodetool
26ff589f3d is described below
commit 26ff589f3da0a66c10c5ca16451a1c49fbb57ade
Author: maoling <[email protected]>
AuthorDate: Wed Jun 12 23:14:00 2024 +0800
Expose current compaction throughput in nodetool
patch by Ling Mao; reviewed by Jon Haddad, Stefan Miklosovic for
CASSANDRA-13890
Co-authored-by: Jon Haddad <[email protected]>
---
CHANGES.txt | 1 +
.../cassandra/db/compaction/CompactionManager.java | 11 ++++++++---
.../apache/cassandra/db/compaction/CompactionTask.java | 2 +-
.../org/apache/cassandra/metrics/CompactionMetrics.java | 3 +++
.../org/apache/cassandra/service/StorageService.java | 17 +++++++++++++++++
.../apache/cassandra/service/StorageServiceMBean.java | 1 +
src/java/org/apache/cassandra/tools/NodeProbe.java | 5 +++++
.../cassandra/tools/nodetool/CompactionStats.java | 4 ++++
.../tools/nodetool/GetCompactionThroughput.java | 9 ++++++++-
.../cassandra/tools/nodetool/CompactionStatsTest.java | 3 +++
.../tools/nodetool/SetGetCompactionThroughputTest.java | 15 +++++++++++++--
11 files changed, 64 insertions(+), 7 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index e56329be91..87987f8046 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Expose current compaction throughput in nodetool (CASSANDRA-13890)
* CEP-24 Password validation / generation (CASSANDRA-17457)
* Reconfigure CMS after replacement, bootstrap and move operations
(CASSANDRA-19705)
* Support querying LocalStrategy tables with any partitioner (CASSANDRA-19692)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 09dbd872fc..ee20b28c50 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -57,7 +57,7 @@ import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import com.codahale.metrics.Meter;
import net.openhft.chronicle.core.util.ThrowingSupplier;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.ExecutorFactory;
@@ -120,7 +120,6 @@ import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.concurrent.Refs;
-
import static java.util.Collections.singleton;
import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.concurrent.FutureTask.callable;
@@ -224,6 +223,11 @@ public class CompactionManager implements
CompactionManagerMBean, ICompactionMan
compactionRateLimiter.setRate(throughput);
}
+ public Meter getCompactionThroughput()
+ {
+ return metrics.bytesCompactedThroughput;
+ }
+
/**
* Call this whenever a compaction might be needed on the given
columnfamily.
* It's okay to over-call (within reason) if a call is unnecessary, it will
@@ -1510,9 +1514,10 @@ public class CompactionManager implements
CompactionManagerMBean, ICompactionMan
}
- static void compactionRateLimiterAcquire(RateLimiter limiter, long
bytesScanned, long lastBytesScanned, double compressionRatio)
+ protected void compactionRateLimiterAcquire(RateLimiter limiter, long
bytesScanned, long lastBytesScanned, double compressionRatio)
{
long lengthRead = (long) ((bytesScanned - lastBytesScanned) *
compressionRatio) + 1;
+ metrics.bytesCompactedThroughput.mark(lengthRead);
while (lengthRead >= Integer.MAX_VALUE)
{
limiter.acquire(Integer.MAX_VALUE);
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4ca0e0f53f..79368f68d3 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -212,7 +212,7 @@ public class CompactionTask extends AbstractCompactionTask
long bytesScanned = scanners.getTotalBytesScanned();
// Rate limit the scanners, and account for compression
-
CompactionManager.compactionRateLimiterAcquire(limiter, bytesScanned,
lastBytesScanned, compressionRatio);
+
CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned,
lastBytesScanned, compressionRatio);
lastBytesScanned = bytesScanned;
diff --git a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
index 8bd48e520e..3bdd14a0bb 100644
--- a/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CompactionMetrics.java
@@ -53,6 +53,8 @@ public class CompactionMetrics
public final Meter totalCompactionsCompleted;
/** Total number of bytes compacted since server [re]start */
public final Counter bytesCompacted;
+ /** Recent/current throughput of compactions take */
+ public final Meter bytesCompactedThroughput;
/** Time spent redistributing index summaries */
public final Timer indexSummaryRedistributionTime;
@@ -147,6 +149,7 @@ public class CompactionMetrics
});
totalCompactionsCompleted =
Metrics.meter(factory.createMetricName("TotalCompactionsCompleted"));
bytesCompacted =
Metrics.counter(factory.createMetricName("BytesCompacted"));
+ bytesCompactedThroughput =
Metrics.meter(factory.createMetricName("BytesCompactedThroughput"));
// compaction failure metrics
compactionsReduced =
Metrics.counter(factory.createMetricName("CompactionsReduced"));
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index d4767bc951..a652e39fc1 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -78,6 +78,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.codahale.metrics.Meter;
import org.apache.cassandra.audit.AuditLogManager;
import org.apache.cassandra.audit.AuditLogOptions;
import org.apache.cassandra.auth.AuthCacheService;
@@ -245,6 +246,7 @@ import static
org.apache.cassandra.config.CassandraRelevantProperties.REPLACE_AD
import static
org.apache.cassandra.config.CassandraRelevantProperties.TEST_WRITE_SURVEY;
import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
import static
org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
+import static org.apache.cassandra.io.util.FileUtils.ONE_MIB;
import static
org.apache.cassandra.schema.SchemaConstants.isLocalSystemKeyspace;
import static
org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
import static
org.apache.cassandra.service.ActiveRepairService.repairCommandExecutor;
@@ -1415,6 +1417,21 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
value, oldValue);
}
+ /**
+ * Get the Current Compaction Throughput
+ * key is 1/5/15minute time dimension for statistics
+ * value is the metric double string (unit is:mib/s)
+ */
+ public Map<String, String> getCurrentCompactionThroughputMebibytesPerSec()
+ {
+ HashMap<String, String> result = new LinkedHashMap<>();
+ Meter rate = CompactionManager.instance.getCompactionThroughput();
+ result.put("1minute", String.format("%.3f", rate.getOneMinuteRate() /
ONE_MIB));
+ result.put("5minute", String.format("%.3f", rate.getFiveMinuteRate() /
ONE_MIB));
+ result.put("15minute", String.format("%.3f",
rate.getFifteenMinuteRate() / ONE_MIB));
+ return result;
+ }
+
public int getBatchlogReplayThrottleInKB()
{
return DatabaseDescriptor.getBatchlogReplayThrottleInKiB();
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 6fb34b2f20..da4206416f 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -807,6 +807,7 @@ public interface StorageServiceMBean extends
NotificationEmitter
@Deprecated(since = "4.1")
public int getCompactionThroughputMbPerSec();
public void setCompactionThroughputMbPerSec(int value);
+ Map<String, String> getCurrentCompactionThroughputMebibytesPerSec();
public int getBatchlogReplayThrottleInKB();
public void setBatchlogReplayThrottleInKB(int value);
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index b121cb3100..047448c208 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1426,6 +1426,11 @@ public class NodeProbe implements AutoCloseable
return ssProxy.getCompactionThroughtputBytesPerSec();
}
+ public Map<String, String> getCurrentCompactionThroughputMiBPerSec()
+ {
+ return ssProxy.getCurrentCompactionThroughputMebibytesPerSec();
+ }
+
public void setBatchlogReplayThrottle(int value)
{
ssProxy.setBatchlogReplayThrottleInKB(value);
diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
index c80de91d97..f76d4d0191 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
@@ -111,6 +111,10 @@ public class CompactionStats extends NodeToolCmd
double configured =
probe.getStorageService().getCompactionThroughtputMibPerSecAsDouble();
tableBuilder.add("compaction throughput (MiB/s)", configured == 0 ?
"throttling disabled (0)" : Double.toString(configured));
+ Map<String, String> currentCompactionThroughputMetricsMap =
probe.getCurrentCompactionThroughputMiBPerSec();
+ tableBuilder.add("current compaction throughput (1 minute)",
currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s");
+ tableBuilder.add("current compaction throughput (5 minute)",
currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s");
+ tableBuilder.add("current compaction throughput (15 minute)",
currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s");
}
public static void reportCompactionTable(List<Map<String,String>>
compactions, long compactionThroughputInBytes, boolean humanReadable,
PrintStream out, TableBuilder table)
diff --git
a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java
b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java
index e71fe0adef..8296511c0a 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetCompactionThroughput.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.tools.nodetool;
+import java.util.Map;
+
import com.google.common.math.DoubleMath;
import io.airlift.airline.Command;
@@ -44,7 +46,12 @@ public class GetCompactionThroughput extends NodeToolCmd
if (!DoubleMath.isMathematicalInteger(throughput))
throw new RuntimeException("Use the -d flag to quiet this
error and get the exact throughput in MiB/s");
- probe.output().out.println("Current compaction throughput: " +
probe.getCompactionThroughput() + " MB/s");
+ probe.output().out.println("Current compaction throughput: " +
probe.getCompactionThroughput() + " MiB/s");
}
+
+ Map<String, String> currentCompactionThroughputMetricsMap =
probe.getCurrentCompactionThroughputMiBPerSec();
+ probe.output().out.println("Current compaction throughput (1 minute):
" + currentCompactionThroughputMetricsMap.get("1minute") + " MiB/s");
+ probe.output().out.println("Current compaction throughput (5 minute):
" + currentCompactionThroughputMetricsMap.get("5minute") + " MiB/s");
+ probe.output().out.println("Current compaction throughput (15 minute):
" + currentCompactionThroughputMetricsMap.get("15minute") + " MiB/s");
}
}
diff --git
a/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java
b/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java
index 8758c3f8fd..65cc638531 100644
--- a/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java
+++ b/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java
@@ -140,6 +140,9 @@ public class CompactionStatsTest extends CQLTester
assertThat(stdout).containsPattern("15 minute
rate\\s+[0-9]*.[0-9]*[0-9]*/minute");
assertThat(stdout).containsPattern("mean
rate\\s+[0-9]*.[0-9]*[0-9]*/hour");
assertThat(stdout).containsPattern("compaction throughput
\\(MiB/s\\)\\s+throttling disabled \\(0\\)");
+ assertThat(stdout).containsPattern("current compaction throughput \\(1
minute\\)\\s+[0-9]*.[0-9]*[0-9]* MiB/s");
+ assertThat(stdout).containsPattern("current compaction throughput \\(5
minute\\)\\s+[0-9]*.[0-9]*[0-9]* MiB/s");
+ assertThat(stdout).containsPattern("current compaction throughput
\\(15 minute\\)\\s+[0-9]*.[0-9]*[0-9]* MiB/s");
CompactionManager.instance.active.finishCompaction(compactionHolder);
waitForNumberOfPendingTasks(0, "compactionstats");
diff --git
a/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java
b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java
index 24ee9e5797..2771881dc2 100644
---
a/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java
+++
b/test/unit/org/apache/cassandra/tools/nodetool/SetGetCompactionThroughputTest.java
@@ -81,6 +81,17 @@ public class SetGetCompactionThroughputTest extends CQLTester
assertPreciseMibFlagNeeded();
}
+ @Test
+ public void testCurrentCompactionThroughput()
+ {
+ ToolResult tool = invokeNodetool("getcompactionthroughput");
+ tool.assertOnCleanExit();
+
+ assertThat(tool.getStdout()).containsPattern("Current compaction
throughput \\(1 minute\\): \\d+\\.\\d+ MiB/s");
+ assertThat(tool.getStdout()).containsPattern("Current compaction
throughput \\(5 minute\\): \\d+\\.\\d+ MiB/s");
+ assertThat(tool.getStdout()).containsPattern("Current compaction
throughput \\(15 minute\\): \\d+\\.\\d+ MiB/s");
+ }
+
private static void assertSetGetValidThroughput(int throughput)
{
ToolResult tool = invokeNodetool("setcompactionthroughput",
String.valueOf(throughput));
@@ -129,9 +140,9 @@ public class SetGetCompactionThroughputTest extends
CQLTester
tool.assertOnCleanExit();
if (expected > 0)
- assertThat(tool.getStdout()).contains("Current compaction
throughput: " + expected + " MB/s");
+ assertThat(tool.getStdout()).contains("Current compaction
throughput: " + expected + " MiB/s");
else
- assertThat(tool.getStdout()).contains("Current compaction
throughput: 0 MB/s");
+ assertThat(tool.getStdout()).contains("Current compaction
throughput: 0 MiB/s");
}
private static void assertGetThroughputDouble(double expected)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]