Repository: cassandra
Updated Branches:
  refs/heads/trunk 6a7fad601 -> 613a8b43d


Add 'nodetool getbatchlogreplaythrottle' and 'nodetool 
setbatchlogreplaythrottle'

patch by Andres de la Peña; reviewed by Paulo Motta for CASSANDRA-13614


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/613a8b43
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/613a8b43
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/613a8b43

Branch: refs/heads/trunk
Commit: 613a8b43d2b5a425080653898b28bde6cd7eb9ba
Parents: 6a7fad6
Author: Andrés de la Peña <a.penya.gar...@gmail.com>
Authored: Thu Jul 6 10:09:29 2017 +0100
Committer: Andrés de la Peña <a.penya.gar...@gmail.com>
Committed: Thu Jul 6 10:09:29 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/batchlog/BatchlogManager.java     | 26 ++++++++++++--
 .../cassandra/config/DatabaseDescriptor.java    |  9 +++--
 .../cassandra/service/StorageService.java       | 11 ++++++
 .../cassandra/service/StorageServiceMBean.java  |  3 ++
 .../org/apache/cassandra/tools/NodeProbe.java   | 10 ++++++
 .../org/apache/cassandra/tools/NodeTool.java    |  2 ++
 .../nodetool/GetBatchlogReplayTrottle.java      | 33 +++++++++++++++++
 .../nodetool/SetBatchlogReplayThrottle.java     | 37 ++++++++++++++++++++
 9 files changed, 128 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/613a8b43/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98c9cad..aa98554 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool 
setbatchlogreplaythrottle' (CASSANDRA-13614)
  * fix race condition in PendingRepairManager (CASSANDRA-13659)
  * Allow noop incremental repair state transitions (CASSANDRA-13658)
  * Run repair with down replicas (CASSANDRA-10446)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/613a8b43/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java 
b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 321fca6..9ca7acf 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -76,6 +76,8 @@ public class BatchlogManager implements BatchlogManagerMBean
     // Single-thread executor service for scheduling and serializing log 
replay.
     private final ScheduledExecutorService batchlogTasks;
 
+    private final RateLimiter rateLimiter = 
RateLimiter.create(Double.MAX_VALUE);
+
     public BatchlogManager()
     {
         ScheduledThreadPoolExecutor executor = new 
DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
@@ -194,8 +196,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             logger.trace("Replay cancelled as there are no peers in the 
ring.");
             return;
         }
-        int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() 
/ endpointsCount;
-        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? 
Double.MAX_VALUE : throttleInKB * 1024);
+        setRate(DatabaseDescriptor.getBatchlogReplayThrottleInKB());
 
         UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - 
getBatchlogTimeout());
         ColumnFamilyStore store = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
@@ -212,6 +213,27 @@ public class BatchlogManager implements 
BatchlogManagerMBean
         logger.trace("Finished replayFailedBatches");
     }
 
+    /**
+     * Sets the rate for the current rate limiter. When {@code throttleInKB} 
is 0, this sets the rate to
+     * {@link Double#MAX_VALUE} bytes per second.
+     *
+     * @param throttleInKB throughput to set in KB per second
+     */
+    public void setRate(final int throttleInKB)
+    {
+        int endpointsCount = 
StorageService.instance.getTokenMetadata().getSizeOfAllEndpoints();
+        if (endpointsCount > 0)
+        {
+            int endpointThrottleInKB = throttleInKB / endpointsCount;
+            double throughput = endpointThrottleInKB == 0 ? Double.MAX_VALUE : 
endpointThrottleInKB * 1024.0;
+            if (rateLimiter.getRate() != throughput)
+            {
+                logger.debug("Updating batchlog replay throttle to {} KB/s, {} 
KB/s per endpoint", throttleInKB, endpointThrottleInKB);
+                rateLimiter.setRate(throughput);
+            }
+        }
+    }
+
     // read less rows (batches) per page if they are very large
     static int calculatePageSize(ColumnFamilyStore store)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/613a8b43/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a750901..e369982 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1844,14 +1844,19 @@ public class DatabaseDescriptor
         return conf.hinted_handoff_throttle_in_kb;
     }
 
+    public static void setHintedHandoffThrottleInKB(int throttleInKB)
+    {
+        conf.hinted_handoff_throttle_in_kb = throttleInKB;
+    }
+
     public static int getBatchlogReplayThrottleInKB()
     {
         return conf.batchlog_replay_throttle_in_kb;
     }
 
-    public static void setHintedHandoffThrottleInKB(int throttleInKB)
+    public static void setBatchlogReplayThrottleInKB(int throttleInKB)
     {
-        conf.hinted_handoff_throttle_in_kb = throttleInKB;
+        conf.batchlog_replay_throttle_in_kb = throttleInKB;
     }
 
     public static int getMaxHintsDeliveryThreads()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/613a8b43/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 2b3e633..2bab471 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1355,6 +1355,17 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         CompactionManager.instance.setRate(value);
     }
 
+    public int getBatchlogReplayThrottleInKB()
+    {
+        return DatabaseDescriptor.getBatchlogReplayThrottleInKB();
+    }
+
+    public void setBatchlogReplayThrottleInKB(int throttleInKB)
+    {
+        DatabaseDescriptor.setBatchlogReplayThrottleInKB(throttleInKB);
+        BatchlogManager.instance.setRate(throttleInKB);
+    }
+
     public int getConcurrentCompactors()
     {
         return DatabaseDescriptor.getConcurrentCompactors();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/613a8b43/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 9f66a7e..bd8aca6 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -514,6 +514,9 @@ public interface StorageServiceMBean extends 
NotificationEmitter
     public int getCompactionThroughputMbPerSec();
     public void setCompactionThroughputMbPerSec(int value);
 
+    public int getBatchlogReplayThrottleInKB();
+    public void setBatchlogReplayThrottleInKB(int value);
+
     public int getConcurrentCompactors();
     public void setConcurrentCompactors(int value);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/613a8b43/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index fcae1f6..2615412 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1028,6 +1028,16 @@ public class NodeProbe implements AutoCloseable
         return ssProxy.getCompactionThroughputMbPerSec();
     }
 
+    public void setBatchlogReplayThrottle(int value)
+    {
+        ssProxy.setBatchlogReplayThrottleInKB(value);
+    }
+
+    public int getBatchlogReplayThrottle()
+    {
+        return ssProxy.getBatchlogReplayThrottleInKB();
+    }
+
     public void setConcurrentCompactors(int value)
     {
         ssProxy.setConcurrentCompactors(value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/613a8b43/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index 31a369d..a117025 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -77,6 +77,7 @@ public class NodeTool
                 DisableGossip.class,
                 EnableHandoff.class,
                 GcStats.class,
+                GetBatchlogReplayTrottle.class,
                 GetCompactionThreshold.class,
                 GetCompactionThroughput.class,
                 GetTimeout.class,
@@ -104,6 +105,7 @@ public class NodeTool
                 ReplayBatchlog.class,
                 SetCacheCapacity.class,
                 SetHintedHandoffThrottleInKB.class,
+                SetBatchlogReplayThrottle.class,
                 SetCompactionThreshold.class,
                 SetCompactionThroughput.class,
                 GetConcurrentCompactors.class,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/613a8b43/src/java/org/apache/cassandra/tools/nodetool/GetBatchlogReplayTrottle.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/tools/nodetool/GetBatchlogReplayTrottle.java 
b/src/java/org/apache/cassandra/tools/nodetool/GetBatchlogReplayTrottle.java
new file mode 100644
index 0000000..661c495
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/GetBatchlogReplayTrottle.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "getbatchlogreplaythrottle", description = "Print batchlog 
replay throttle in KB/s. " +
+                                                           "This is reduced 
proportionally to the number of nodes in the cluster.")
+public class GetBatchlogReplayTrottle extends NodeToolCmd
+{
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        System.out.println("Batchlog replay throttle: " + 
probe.getBatchlogReplayThrottle() + " KB/s");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/613a8b43/src/java/org/apache/cassandra/tools/nodetool/SetBatchlogReplayThrottle.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/tools/nodetool/SetBatchlogReplayThrottle.java 
b/src/java/org/apache/cassandra/tools/nodetool/SetBatchlogReplayThrottle.java
new file mode 100644
index 0000000..65bb8f5
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/tools/nodetool/SetBatchlogReplayThrottle.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "setbatchlogreplaythrottle", description = "Set batchlog 
replay throttle in KB per second, or 0 to disable throttling. " +
+                                                           "This will be 
reduced proportionally to the number of nodes in the cluster.")
+public class SetBatchlogReplayThrottle extends NodeToolCmd
+{
+    @Arguments(title = "batchlog_replay_throttle", usage = 
"<value_in_kb_per_sec>", description = "Value in KB per second, 0 to disable 
throttling", required = true)
+    private Integer batchlogReplayThrottle = null;
+
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        probe.setBatchlogReplayThrottle(batchlogReplayThrottle);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to