Support optional backpressure strategies at the coordinator patch by Sergio Bossa; reviewed by Stefania Alborghetti for CASSANDRA-9318
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d43b9ce5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d43b9ce5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d43b9ce5 Branch: refs/heads/trunk Commit: d43b9ce5092f8879a1a66afebab74d86e9e127fb Parents: 560faba Author: Sergio Bossa <sergio.bo...@gmail.com> Authored: Mon Sep 19 10:42:50 2016 +0800 Committer: Stefania Alborghetti <stefania.alborghe...@datastax.com> Committed: Tue Sep 20 09:07:36 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 22 + .../org/apache/cassandra/config/Config.java | 4 +- .../cassandra/config/DatabaseDescriptor.java | 46 +++ .../apache/cassandra/hints/HintsDispatcher.java | 6 + .../apache/cassandra/net/BackPressureState.java | 51 +++ .../cassandra/net/BackPressureStrategy.java | 42 ++ .../apache/cassandra/net/IAsyncCallback.java | 5 + .../apache/cassandra/net/MessagingService.java | 124 +++++- .../cassandra/net/MessagingServiceMBean.java | 18 +- .../net/OutboundTcpConnectionPool.java | 12 +- .../cassandra/net/RateBasedBackPressure.java | 296 ++++++++++++++ .../net/RateBasedBackPressureState.java | 133 ++++++ .../cassandra/net/ResponseVerbHandler.java | 5 + .../service/AbstractWriteResponseHandler.java | 27 +- .../apache/cassandra/service/StorageProxy.java | 37 +- .../apache/cassandra/utils/SlidingTimeRate.java | 167 ++++++++ .../cassandra/utils/SystemTimeSource.java | 54 +++ .../apache/cassandra/utils/TestRateLimiter.java | 58 +++ .../org/apache/cassandra/utils/TimeSource.java | 58 +++ .../utils/concurrent/IntervalLock.java | 69 ++++ test/resources/byteman/mutation_limiter.btm | 8 + .../config/DatabaseDescriptorRefTest.java | 1 + .../cassandra/net/MessagingServiceTest.java | 247 ++++++++++- .../net/RateBasedBackPressureTest.java | 409 +++++++++++++++++++ .../cassandra/utils/SlidingTimeRateTest.java | 146 +++++++ .../apache/cassandra/utils/TestTimeSource.java | 72 ++++ 27 files changed, 2072 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b625a58..e9e8ccf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318) * Make randompartitioner work with new vnode allocation (CASSANDRA-12647) * Fix cassandra-stress graphing (CASSANDRA-12237) * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index a25e084..8a0b3ee 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1172,3 +1172,25 @@ gc_warn_threshold_in_ms: 1000 # early. Any value size larger than this threshold will result into marking an SSTable # as corrupted. # max_value_size_in_mb: 256 + +# Back-pressure settings # +# If enabled, the coordinator will apply the back-pressure strategy specified below to each mutation +# sent to replicas, with the aim of reducing pressure on overloaded replicas. +back_pressure_enabled: false +# The back-pressure strategy applied. +# The default implementation, RateBasedBackPressure, takes three arguments: +# high ratio, factor, and flow type, and uses the ratio between incoming mutation responses and outgoing mutation requests. +# If below high ratio, outgoing mutations are rate limited according to the incoming rate decreased by the given factor; +# if above high ratio, the rate limiting is increased by the given factor; +# such factor is usually best configured between 1 and 10, use larger values for a faster recovery +# at the expense of potentially more dropped mutations; +# the rate limiting is applied according to the flow type: if FAST, it's rate limited at the speed of the fastest replica, +# if SLOW at the speed of the slowest one. +# New strategies can be added. Implementors need to implement org.apache.cassandra.net.BackpressureStrategy and +# provide a public constructor accepting a Map<String, Object>. +back_pressure_strategy: + - class_name: org.apache.cassandra.net.RateBasedBackPressure + parameters: + - high_ratio: 0.90 + factor: 5 + flow: FAST http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index f2f21ad..e6b3638 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -46,7 +46,6 @@ public class Config */ public static final String PROPERTY_PREFIX = "cassandra."; - public String cluster_name = "Test Cluster"; public String authenticator; public String authorizer; @@ -355,6 +354,9 @@ public class Config */ public UserFunctionTimeoutPolicy user_function_timeout_policy = UserFunctionTimeoutPolicy.die; + public volatile boolean back_pressure_enabled = false; + public volatile ParameterizedClass back_pressure_strategy; + public static boolean getOutboundBindAny() { return outboundBindAny; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index dc4cc36..ce889ff 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -19,6 +19,7 @@ package org.apache.cassandra.config; import java.io.File; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.*; import java.nio.file.FileStore; import java.nio.file.Files; @@ -53,6 +54,8 @@ import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.apache.cassandra.locator.EndpointSnitchInfo; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.SeedProvider; +import org.apache.cassandra.net.BackPressureStrategy; +import org.apache.cassandra.net.RateBasedBackPressure; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.scheduler.NoScheduler; import org.apache.cassandra.security.EncryptionContext; @@ -110,6 +113,7 @@ public class DatabaseDescriptor private static EncryptionContext encryptionContext; private static boolean hasLoggedConfig; + private static BackPressureStrategy backPressureStrategy; private static DiskOptimizationStrategy diskOptimizationStrategy; private static boolean clientInitialized; @@ -706,6 +710,27 @@ public class DatabaseDescriptor diskOptimizationStrategy = new SpinningDiskOptimizationStrategy(); break; } + + try + { + ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams(); + Class<?> clazz = Class.forName(strategy.class_name); + if (!BackPressureStrategy.class.isAssignableFrom(clazz)) + throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false); + + Constructor<?> ctor = clazz.getConstructor(Map.class); + BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters); + logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy); + backPressureStrategy = instance; + } + catch (ConfigurationException ex) + { + throw ex; + } + catch (Exception ex) + { + throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex); + } } public static void applyAddressConfig() throws ConfigurationException @@ -2342,4 +2367,25 @@ public class DatabaseDescriptor { return Integer.parseInt(System.getProperty("cassandra.search_concurrency_factor", "1")); } + + public static void setBackPressureEnabled(boolean backPressureEnabled) + { + conf.back_pressure_enabled = backPressureEnabled; + } + + public static boolean backPressureEnabled() + { + return conf.back_pressure_enabled; + } + + @VisibleForTesting + public static void setBackPressureStrategy(BackPressureStrategy strategy) + { + backPressureStrategy = strategy; + } + + public static BackPressureStrategy getBackPressureStrategy() + { + return backPressureStrategy; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/hints/HintsDispatcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/HintsDispatcher.java b/src/java/org/apache/cassandra/hints/HintsDispatcher.java index 29bab80..6940e1e 100644 --- a/src/java/org/apache/cassandra/hints/HintsDispatcher.java +++ b/src/java/org/apache/cassandra/hints/HintsDispatcher.java @@ -209,5 +209,11 @@ final class HintsDispatcher implements AutoCloseable { return false; } + + @Override + public boolean supportsBackPressure() + { + return true; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/BackPressureState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/BackPressureState.java b/src/java/org/apache/cassandra/net/BackPressureState.java new file mode 100644 index 0000000..34fd0dd --- /dev/null +++ b/src/java/org/apache/cassandra/net/BackPressureState.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; + +/** + * Interface meant to track the back-pressure state per replica host. + */ +public interface BackPressureState +{ + /** + * Called when a message is sent to a replica. + */ + void onMessageSent(MessageOut<?> message); + + /** + * Called when a response is received from a replica. + */ + void onResponseReceived(); + + /** + * Called when no response is received from replica. + */ + void onResponseTimeout(); + + /** + * Gets the current back-pressure rate limit. + */ + double getBackPressureRateLimit(); + + /** + * Returns the host this state refers to. + */ + InetAddress getHost(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/BackPressureStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/BackPressureStrategy.java b/src/java/org/apache/cassandra/net/BackPressureStrategy.java new file mode 100644 index 0000000..b61a0a1 --- /dev/null +++ b/src/java/org/apache/cassandra/net/BackPressureStrategy.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Back-pressure algorithm interface. + * <br/> + * For experts usage only. Implementors must provide a constructor accepting a single {@code Map<String, Object>} argument, + * representing any parameters eventually required by the specific implementation. + */ +public interface BackPressureStrategy<S extends BackPressureState> +{ + /** + * Applies the back-pressure algorithm, based and acting on the given {@link BackPressureState}s, and up to the given + * timeout. + */ + void apply(Set<S> states, long timeout, TimeUnit unit); + + /** + * Creates a new {@link BackPressureState} initialized as needed by the specific implementation. + */ + S newState(InetAddress host); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/IAsyncCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java index d159e0c..7835079 100644 --- a/src/java/org/apache/cassandra/net/IAsyncCallback.java +++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java @@ -49,4 +49,9 @@ public interface IAsyncCallback<T> * given as input to the dynamic snitch. */ boolean isLatencyForSnitch(); + + default boolean supportsBackPressure() + { + return false; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index e72a9a2..7632ebd 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -28,6 +28,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -39,8 +41,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.cliffc.high_scale_lib.NonBlockingHashMap; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.concurrent.ExecutorLocals; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.Stage; @@ -419,7 +423,6 @@ public final class MessagingService implements MessagingServiceMBean Verb.BATCH_STORE, Verb.BATCH_REMOVE); - private static final class DroppedMessages { final DroppedMessageMetrics metrics; @@ -445,20 +448,8 @@ public final class MessagingService implements MessagingServiceMBean // message sinks are a testing hook private final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>(); - public void addMessageSink(IMessageSink sink) - { - messageSinks.add(sink); - } - - public void removeMessageSink(IMessageSink sink) - { - messageSinks.remove(sink); - } - - public void clearMessageSinks() - { - messageSinks.clear(); - } + // back-pressure implementation + private final BackPressureStrategy backPressure = DatabaseDescriptor.getBackPressureStrategy(); private static class MSHandle { @@ -504,9 +495,17 @@ public final class MessagingService implements MessagingServiceMBean public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pair) { final CallbackInfo expiredCallbackInfo = pair.right.value; + maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout); + ConnectionMetrics.totalTimeouts.mark(); getConnectionPool(expiredCallbackInfo.target).incrementTimeout(); + + if (expiredCallbackInfo.callback.supportsBackPressure()) + { + updateBackPressureOnReceive(expiredCallbackInfo.target, expiredCallbackInfo.callback, true); + } + if (expiredCallbackInfo.isFailureCallback()) { StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable() @@ -545,6 +544,76 @@ public final class MessagingService implements MessagingServiceMBean } } + public void addMessageSink(IMessageSink sink) + { + messageSinks.add(sink); + } + + public void removeMessageSink(IMessageSink sink) + { + messageSinks.remove(sink); + } + + public void clearMessageSinks() + { + messageSinks.clear(); + } + + /** + * Updates the back-pressure state on sending to the given host if enabled and the given message callback supports it. + * + * @param host The replica host the back-pressure state refers to. + * @param callback The message callback. + * @param message The actual message. + */ + public void updateBackPressureOnSend(InetAddress host, IAsyncCallback callback, MessageOut<?> message) + { + if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure()) + { + BackPressureState backPressureState = getConnectionPool(host).getBackPressureState(); + backPressureState.onMessageSent(message); + } + } + + /** + * Updates the back-pressure state on reception from the given host if enabled and the given message callback supports it. + * + * @param host The replica host the back-pressure state refers to. + * @param callback The message callback. + * @param timeout True if updated following a timeout, false otherwise. + */ + public void updateBackPressureOnReceive(InetAddress host, IAsyncCallback callback, boolean timeout) + { + if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure()) + { + BackPressureState backPressureState = getConnectionPool(host).getBackPressureState(); + if (!timeout) + backPressureState.onResponseReceived(); + else + backPressureState.onResponseTimeout(); + } + } + + /** + * Applies back-pressure for the given hosts, according to the configured strategy. + * + * If the local host is present, it is removed from the pool, as back-pressure is only applied + * to remote hosts. + * + * @param hosts The hosts to apply back-pressure to. + * @param timeoutInNanos The max back-pressure timeout. + */ + public void applyBackPressure(Iterable<InetAddress> hosts, long timeoutInNanos) + { + if (DatabaseDescriptor.backPressureEnabled()) + { + backPressure.apply(StreamSupport.stream(hosts.spliterator(), false) + .filter(h -> !h.equals(FBUtilities.getBroadcastAddress())) + .map(h -> getConnectionPool(h).getBackPressureState()) + .collect(Collectors.toSet()), timeoutInNanos, TimeUnit.NANOSECONDS); + } + } + /** * Track latency information for the dynamic snitch * @@ -699,7 +768,7 @@ public final class MessagingService implements MessagingServiceMBean OutboundTcpConnectionPool cp = connectionManagers.get(to); if (cp == null) { - cp = new OutboundTcpConnectionPool(to); + cp = new OutboundTcpConnectionPool(to, backPressure.newState(to)); OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp); if (existingPool != null) cp = existingPool; @@ -805,6 +874,7 @@ public final class MessagingService implements MessagingServiceMBean public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb, long timeout, boolean failureCallback) { int id = addCallback(cb, message, to, timeout, failureCallback); + updateBackPressureOnSend(to, cb, message); sendOneWay(failureCallback ? message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE) : message, id, to); return id; } @@ -827,6 +897,7 @@ public final class MessagingService implements MessagingServiceMBean boolean allowHints) { int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints); + updateBackPressureOnSend(to, handler, message); sendOneWay(message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE), id, to); return id; } @@ -1379,6 +1450,27 @@ public final class MessagingService implements MessagingServiceMBean return result; } + public Map<String, Double> getBackPressurePerHost() + { + Map<String, Double> map = new HashMap<>(connectionManagers.size()); + for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet()) + map.put(entry.getKey().getHostAddress(), entry.getValue().getBackPressureState().getBackPressureRateLimit()); + + return map; + } + + @Override + public void setBackPressureEnabled(boolean enabled) + { + DatabaseDescriptor.setBackPressureEnabled(enabled); + } + + @Override + public boolean isBackPressureEnabled() + { + return DatabaseDescriptor.backPressureEnabled(); + } + public static IPartitioner globalPartitioner() { return StorageService.instance.getTokenMetadata().partitioner; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/MessagingServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java index 3bcb0d5..b2e79e0 100644 --- a/src/java/org/apache/cassandra/net/MessagingServiceMBean.java +++ b/src/java/org/apache/cassandra/net/MessagingServiceMBean.java @@ -23,8 +23,7 @@ import java.net.UnknownHostException; import java.util.Map; /** - * MBean exposing MessagingService metrics. - * - OutboundConnectionPools - Command/Response - Pending/Completed Tasks + * MBean exposing MessagingService metrics plus allowing to enable/disable back-pressure. */ public interface MessagingServiceMBean { @@ -88,5 +87,20 @@ public interface MessagingServiceMBean */ public Map<String, Long> getTimeoutsPerHost(); + /** + * Back-pressure rate limiting per host + */ + public Map<String, Double> getBackPressurePerHost(); + + /** + * Enable/Disable back-pressure + */ + public void setBackPressureEnabled(boolean enabled); + + /** + * Get back-pressure enabled state + */ + public boolean isBackPressureEnabled(); + public int getVersion(String address) throws UnknownHostException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index 0418ff6..b0391ba 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -50,7 +50,10 @@ public class OutboundTcpConnectionPool private InetAddress resetEndpoint; private ConnectionMetrics metrics; - OutboundTcpConnectionPool(InetAddress remoteEp) + // back-pressure state linked to this connection: + private final BackPressureState backPressureState; + + OutboundTcpConnectionPool(InetAddress remoteEp, BackPressureState backPressureState) { id = remoteEp; resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp); @@ -59,6 +62,8 @@ public class OutboundTcpConnectionPool smallMessages = new OutboundTcpConnection(this, "Small"); largeMessages = new OutboundTcpConnection(this, "Large"); gossipMessages = new OutboundTcpConnection(this, "Gossip"); + + this.backPressureState = backPressureState; } /** @@ -74,6 +79,11 @@ public class OutboundTcpConnectionPool : smallMessages; } + public BackPressureState getBackPressureState() + { + return backPressureState; + } + void reset() { for (OutboundTcpConnection conn : new OutboundTcpConnection[] { smallMessages, largeMessages, gossipMessages }) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/RateBasedBackPressure.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java new file mode 100644 index 0000000..1dae243 --- /dev/null +++ b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.RateLimiter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.SystemTimeSource; +import org.apache.cassandra.utils.TimeSource; +import org.apache.cassandra.utils.concurrent.IntervalLock; + +/** + * Back-pressure algorithm based on rate limiting according to the ratio between incoming and outgoing rates, computed + * over a sliding time window with size equal to write RPC timeout. + */ +public class RateBasedBackPressure implements BackPressureStrategy<RateBasedBackPressureState> +{ + static final String HIGH_RATIO = "high_ratio"; + static final String FACTOR = "factor"; + static final String FLOW = "flow"; + private static final String BACK_PRESSURE_HIGH_RATIO = "0.90"; + private static final String BACK_PRESSURE_FACTOR = "5"; + private static final String BACK_PRESSURE_FLOW = "FAST"; + + private static final Logger logger = LoggerFactory.getLogger(RateBasedBackPressure.class); + private static final NoSpamLogger tenSecsNoSpamLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.SECONDS); + private static final NoSpamLogger oneMinNoSpamLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); + + protected final TimeSource timeSource; + protected final double highRatio; + protected final int factor; + protected final Flow flow; + protected final long windowSize; + + private final Cache<Set<RateBasedBackPressureState>, IntervalRateLimiter> rateLimiters = + CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).build(); + + enum Flow + { + FAST, + SLOW + } + + public static ParameterizedClass withDefaultParams() + { + return new ParameterizedClass(RateBasedBackPressure.class.getName(), + ImmutableMap.of(HIGH_RATIO, BACK_PRESSURE_HIGH_RATIO, + FACTOR, BACK_PRESSURE_FACTOR, + FLOW, BACK_PRESSURE_FLOW)); + } + + public RateBasedBackPressure(Map<String, Object> args) + { + this(args, new SystemTimeSource(), DatabaseDescriptor.getWriteRpcTimeout()); + } + + @VisibleForTesting + public RateBasedBackPressure(Map<String, Object> args, TimeSource timeSource, long windowSize) + { + if (args.size() != 3) + { + throw new IllegalArgumentException(RateBasedBackPressure.class.getCanonicalName() + + " requires 3 arguments: high ratio, back-pressure factor and flow type."); + } + + try + { + highRatio = Double.parseDouble(args.getOrDefault(HIGH_RATIO, "").toString().trim()); + factor = Integer.parseInt(args.getOrDefault(FACTOR, "").toString().trim()); + flow = Flow.valueOf(args.getOrDefault(FLOW, "").toString().trim().toUpperCase()); + } + catch (Exception ex) + { + throw new IllegalArgumentException(ex.getMessage(), ex); + } + + if (highRatio <= 0 || highRatio > 1) + { + throw new IllegalArgumentException("Back-pressure high ratio must be > 0 and <= 1"); + } + if (factor < 1) + { + throw new IllegalArgumentException("Back-pressure factor must be >= 1"); + } + if (windowSize < 10) + { + throw new IllegalArgumentException("Back-pressure window size must be >= 10"); + } + + this.timeSource = timeSource; + this.windowSize = windowSize; + + logger.info("Initialized back-pressure with high ratio: {}, factor: {}, flow: {}, window size: {}.", + highRatio, factor, flow, windowSize); + } + + @Override + public void apply(Set<RateBasedBackPressureState> states, long timeout, TimeUnit unit) + { + // Go through the back-pressure states, try updating each of them and collect min/max rates: + boolean isUpdated = false; + double minRateLimit = Double.POSITIVE_INFINITY; + double maxRateLimit = Double.NEGATIVE_INFINITY; + double minIncomingRate = Double.POSITIVE_INFINITY; + RateLimiter currentMin = null; + RateLimiter currentMax = null; + for (RateBasedBackPressureState backPressure : states) + { + // Get the incoming/outgoing rates: + double incomingRate = backPressure.incomingRate.get(TimeUnit.SECONDS); + double outgoingRate = backPressure.outgoingRate.get(TimeUnit.SECONDS); + // Compute the min incoming rate: + if (incomingRate < minIncomingRate) + minIncomingRate = incomingRate; + + // Try acquiring the interval lock: + if (backPressure.tryIntervalLock(windowSize)) + { + // If acquired, proceed updating thi back-pressure state rate limit: + isUpdated = true; + try + { + RateLimiter limiter = backPressure.rateLimiter; + + // If we have sent any outgoing requests during this time window, go ahead with rate limiting + // (this is safe against concurrent back-pressure state updates thanks to the rw-locking in + // RateBasedBackPressureState): + if (outgoingRate > 0) + { + // Compute the incoming/outgoing ratio: + double actualRatio = incomingRate / outgoingRate; + + // If the ratio is above the high mark, try growing by the back-pressure factor: + if (actualRatio >= highRatio) + { + // Only if the outgoing rate is able to keep up with the rate increase: + if (limiter.getRate() <= outgoingRate) + { + double newRate = limiter.getRate() + ((limiter.getRate() * factor) / 100); + if (newRate > 0 && newRate != Double.POSITIVE_INFINITY) + { + limiter.setRate(newRate); + } + } + } + // If below, set the rate limiter at the incoming rate, decreased by factor: + else + { + // Only if the new rate is actually less than the actual rate: + double newRate = incomingRate - ((incomingRate * factor) / 100); + if (newRate > 0 && newRate < limiter.getRate()) + { + limiter.setRate(newRate); + } + } + + logger.trace("Back-pressure state for {}: incoming rate {}, outgoing rate {}, ratio {}, rate limiting {}", + backPressure.getHost(), incomingRate, outgoingRate, actualRatio, limiter.getRate()); + } + // Otherwise reset the rate limiter: + else + { + limiter.setRate(Double.POSITIVE_INFINITY); + } + + // Housekeeping: pruning windows and resetting the last check timestamp! + backPressure.incomingRate.prune(); + backPressure.outgoingRate.prune(); + } + finally + { + backPressure.releaseIntervalLock(); + } + } + if (backPressure.rateLimiter.getRate() <= minRateLimit) + { + minRateLimit = backPressure.rateLimiter.getRate(); + currentMin = backPressure.rateLimiter; + } + if (backPressure.rateLimiter.getRate() >= maxRateLimit) + { + maxRateLimit = backPressure.rateLimiter.getRate(); + currentMax = backPressure.rateLimiter; + } + } + + // Now find the rate limiter corresponding to the replica group represented by these back-pressure states: + if (!states.isEmpty()) + { + try + { + // Get the rate limiter: + IntervalRateLimiter rateLimiter = rateLimiters.get(states, () -> new IntervalRateLimiter(timeSource)); + + // If the back-pressure was updated and we acquire the interval lock for the rate limiter of this group: + if (isUpdated && rateLimiter.tryIntervalLock(windowSize)) + { + try + { + // Update the rate limiter value based on the configured flow: + if (flow.equals(Flow.FAST)) + rateLimiter.limiter = currentMax; + else + rateLimiter.limiter = currentMin; + + tenSecsNoSpamLogger.info("{} currently applied for remote replicas: {}", rateLimiter.limiter, states); + } + finally + { + rateLimiter.releaseIntervalLock(); + } + } + // Assigning a single rate limiter per replica group once per window size allows the back-pressure rate + // limiting to be stable within the group itself. + + // Finally apply the rate limit with a max pause time equal to the provided timeout minus the + // response time computed from the incoming rate, to reduce the number of client timeouts by taking into + // account how long it could take to process responses after back-pressure: + long responseTimeInNanos = (long) (TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) / minIncomingRate); + doRateLimit(rateLimiter.limiter, Math.max(0, TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos)); + } + catch (ExecutionException ex) + { + throw new IllegalStateException(ex); + } + } + } + + @Override + public RateBasedBackPressureState newState(InetAddress host) + { + return new RateBasedBackPressureState(host, timeSource, windowSize); + } + + @VisibleForTesting + RateLimiter getRateLimiterForReplicaGroup(Set<RateBasedBackPressureState> states) + { + IntervalRateLimiter rateLimiter = rateLimiters.getIfPresent(states); + return rateLimiter != null ? rateLimiter.limiter : RateLimiter.create(Double.POSITIVE_INFINITY); + } + + @VisibleForTesting + boolean doRateLimit(RateLimiter rateLimiter, long timeoutInNanos) + { + if (!rateLimiter.tryAcquire(1, timeoutInNanos, TimeUnit.NANOSECONDS)) + { + timeSource.sleepUninterruptibly(timeoutInNanos, TimeUnit.NANOSECONDS); + oneMinNoSpamLogger.info("Cannot apply {} due to exceeding write timeout, pausing {} nanoseconds instead.", + rateLimiter, timeoutInNanos); + + return false; + } + + return true; + } + + private static class IntervalRateLimiter extends IntervalLock + { + public volatile RateLimiter limiter = RateLimiter.create(Double.POSITIVE_INFINITY); + + IntervalRateLimiter(TimeSource timeSource) + { + super(timeSource); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java new file mode 100644 index 0000000..c19f277 --- /dev/null +++ b/src/java/org/apache/cassandra/net/RateBasedBackPressureState.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.net; + +import java.net.InetAddress; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.RateLimiter; + +import org.apache.cassandra.utils.SlidingTimeRate; +import org.apache.cassandra.utils.TimeSource; +import org.apache.cassandra.utils.concurrent.IntervalLock; + +/** + * The rate-based back-pressure state, tracked per replica host. + * <br/><br/> + * + * This back-pressure state is made up of the following attributes: + * <ul> + * <li>windowSize: the length of the back-pressure window in milliseconds.</li> + * <li>incomingRate: the rate of back-pressure supporting incoming messages.</li> + * <li>outgoingRate: the rate of back-pressure supporting outgoing messages.</li> + * <li>rateLimiter: the rate limiter to eventually apply to outgoing messages.</li> + * </ul> + * <br/> + * The incomingRate and outgoingRate are updated together when a response is received to guarantee consistency between + * the two. + * <br/> + * It also provides methods to exclusively lock/release back-pressure windows at given intervals; + * this allows to apply back-pressure even under concurrent modifications. Please also note a read lock is acquired + * during response processing so that no concurrent rate updates can screw rate computations. + */ +class RateBasedBackPressureState extends IntervalLock implements BackPressureState +{ + private final InetAddress host; + private final long windowSize; + final SlidingTimeRate incomingRate; + final SlidingTimeRate outgoingRate; + final RateLimiter rateLimiter; + + RateBasedBackPressureState(InetAddress host, TimeSource timeSource, long windowSize) + { + super(timeSource); + this.host = host; + this.windowSize = windowSize; + this.incomingRate = new SlidingTimeRate(timeSource, this.windowSize, this.windowSize / 10, TimeUnit.MILLISECONDS); + this.outgoingRate = new SlidingTimeRate(timeSource, this.windowSize, this.windowSize / 10, TimeUnit.MILLISECONDS); + this.rateLimiter = RateLimiter.create(Double.POSITIVE_INFINITY); + } + + @Override + public void onMessageSent(MessageOut<?> message) {} + + @Override + public void onResponseReceived() + { + readLock().lock(); + try + { + incomingRate.update(1); + outgoingRate.update(1); + } + finally + { + readLock().unlock(); + } + } + + @Override + public void onResponseTimeout() + { + readLock().lock(); + try + { + outgoingRate.update(1); + } + finally + { + readLock().unlock(); + } + } + + @Override + public double getBackPressureRateLimit() + { + return rateLimiter.getRate(); + } + + @Override + public InetAddress getHost() + { + return host; + } + + @Override + public boolean equals(Object obj) + { + if (obj instanceof RateBasedBackPressureState) + { + RateBasedBackPressureState other = (RateBasedBackPressureState) obj; + return this.host.equals(other.host); + } + return false; + } + + @Override + public int hashCode() + { + return this.host.hashCode(); + } + + @Override + public String toString() + { + return String.format("[host: %s, incoming rate: %.3f, outgoing rate: %.3f, rate limit: %.3f]", + host, incomingRate.get(TimeUnit.SECONDS), outgoingRate.get(TimeUnit.SECONDS), rateLimiter.getRate()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/net/ResponseVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 89e1051..fe22e42 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -52,5 +52,10 @@ public class ResponseVerbHandler implements IVerbHandler MessagingService.instance().maybeAddLatency(cb, message.from, latency); cb.response(message); } + + if (callbackInfo.callback.supportsBackPressure()) + { + MessagingService.instance().updateBackPressureOnReceive(message.from, cb, false); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 7cc854a..8c30b89 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import com.google.common.collect.Iterables; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +54,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW private volatile int failures = 0; private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint; private final long queryStartNanoTime; + private volatile boolean supportsBackPressure = true; /** * @param callback A callback to be called when the write is successful. @@ -78,11 +80,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW public void get() throws WriteTimeoutException, WriteFailureException { - long requestTimeout = writeType == WriteType.COUNTER - ? DatabaseDescriptor.getCounterWriteRpcTimeout() - : DatabaseDescriptor.getWriteRpcTimeout(); - - long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - queryStartNanoTime); + long timeout = currentTimeout(); boolean success; try @@ -112,6 +110,14 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW } } + public final long currentTimeout() + { + long requestTimeout = writeType == WriteType.COUNTER + ? DatabaseDescriptor.getCounterWriteRpcTimeout() + : DatabaseDescriptor.getWriteRpcTimeout(); + return TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - queryStartNanoTime); + } + /** * @return the minimum number of endpoints that must reply. */ @@ -172,4 +178,15 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW if (totalBlockFor() + n > totalEndpoints()) signal(); } + + @Override + public boolean supportsBackPressure() + { + return supportsBackPressure; + } + + public void setSupportsBackPressure(boolean supportsBackPressure) + { + this.supportsBackPressure = supportsBackPressure; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index ca8a9c6..241aa4f 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -25,6 +25,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; + import javax.management.MBeanServer; import javax.management.ObjectName; @@ -33,7 +34,9 @@ import com.google.common.cache.CacheLoader; import com.google.common.collect.*; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.Uninterruptibles; + import org.apache.commons.lang3.StringUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -527,6 +530,7 @@ public class StorageProxy implements StorageProxyMBean { AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE, queryStartNanoTime); + responseHandler.setSupportsBackPressure(false); } MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer); @@ -1224,6 +1228,10 @@ public class StorageProxy implements StorageProxyMBean Stage stage) throws OverloadedException { + int targetsSize = Iterables.size(targets); + + // this dc replicas: + Collection<InetAddress> localDc = null; // extra-datacenter replicas, grouped by dc Map<String, Collection<InetAddress>> dcGroups = null; // only need to create a Message for non-local writes @@ -1232,6 +1240,8 @@ public class StorageProxy implements StorageProxyMBean boolean insertLocal = false; ArrayList<InetAddress> endpointsToHint = null; + List<InetAddress> backPressureHosts = null; + for (InetAddress destination : targets) { checkHintOverload(destination); @@ -1247,12 +1257,17 @@ public class StorageProxy implements StorageProxyMBean // belongs on a different server if (message == null) message = mutation.createMessage(); + String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); + // direct writes to local DC or old Cassandra versions // (1.1 knows how to forward old-style String message IDs; updated to int in 2.0) if (localDataCenter.equals(dc)) { - MessagingService.instance().sendRR(message, destination, responseHandler, true); + if (localDc == null) + localDc = new ArrayList<>(targetsSize); + + localDc.add(destination); } else { @@ -1264,8 +1279,14 @@ public class StorageProxy implements StorageProxyMBean dcGroups = new HashMap<>(); dcGroups.put(dc, messages); } + messages.add(destination); } + + if (backPressureHosts == null) + backPressureHosts = new ArrayList<>(targetsSize); + + backPressureHosts.add(destination); } } else @@ -1273,24 +1294,30 @@ public class StorageProxy implements StorageProxyMBean if (shouldHint(destination)) { if (endpointsToHint == null) - endpointsToHint = new ArrayList<>(Iterables.size(targets)); + endpointsToHint = new ArrayList<>(targetsSize); + endpointsToHint.add(destination); } } } + if (backPressureHosts != null) + MessagingService.instance().applyBackPressure(backPressureHosts, responseHandler.currentTimeout()); + if (endpointsToHint != null) submitHint(mutation, endpointsToHint, responseHandler); if (insertLocal) performLocally(stage, Optional.of(mutation), mutation::apply, responseHandler); + if (localDc != null) + { + for (InetAddress destination : localDc) + MessagingService.instance().sendRR(message, destination, responseHandler, true); + } if (dcGroups != null) { // for each datacenter, send the message to one node to relay the write to other replicas - if (message == null) - message = mutation.createMessage(); - for (Collection<InetAddress> dcTargets : dcGroups.values()) sendMessagesToNonlocalDC(message, dcTargets, responseHandler); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/SlidingTimeRate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/SlidingTimeRate.java b/src/java/org/apache/cassandra/utils/SlidingTimeRate.java new file mode 100644 index 0000000..3053a05 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/SlidingTimeRate.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * Concurrent rate computation over a sliding time window. + */ +public class SlidingTimeRate +{ + private final ConcurrentSkipListMap<Long, AtomicInteger> counters = new ConcurrentSkipListMap<>(); + private final AtomicLong lastCounterTimestamp = new AtomicLong(0); + private final ReadWriteLock pruneLock = new ReentrantReadWriteLock(); + private final long sizeInMillis; + private final long precisionInMillis; + private final TimeSource timeSource; + + /** + * Creates a sliding rate whose time window is of the given size, with the given precision and time unit. + * <br/> + * The precision defines how accurate the rate computation is, as it will be computed over window size +/- + * precision. + */ + public SlidingTimeRate(TimeSource timeSource, long size, long precision, TimeUnit unit) + { + Preconditions.checkArgument(size > precision, "Size should be greater than precision."); + Preconditions.checkArgument(TimeUnit.MILLISECONDS.convert(precision, unit) >= 1, "Precision must be greater than or equal to 1 millisecond."); + this.sizeInMillis = TimeUnit.MILLISECONDS.convert(size, unit); + this.precisionInMillis = TimeUnit.MILLISECONDS.convert(precision, unit); + this.timeSource = timeSource; + } + + /** + * Updates the rate. + */ + public void update(int delta) + { + pruneLock.readLock().lock(); + try + { + while (true) + { + long now = timeSource.currentTimeMillis(); + long lastTimestamp = lastCounterTimestamp.get(); + boolean isWithinPrecisionRange = (now - lastTimestamp) < precisionInMillis; + AtomicInteger lastCounter = counters.get(lastTimestamp); + // If there's a valid counter for the current last timestamp, and we're in the precision range, + // update such counter: + if (lastCounter != null && isWithinPrecisionRange) + { + lastCounter.addAndGet(delta); + + break; + } + // Else if there's no counter or we're past the precision range, try to create a new counter, + // but only the thread updating the last timestamp will create a new counter: + else if (lastCounterTimestamp.compareAndSet(lastTimestamp, now)) + { + AtomicInteger existing = counters.putIfAbsent(now, new AtomicInteger(delta)); + if (existing != null) + { + existing.addAndGet(delta); + } + + break; + } + } + } + finally + { + pruneLock.readLock().unlock(); + } + } + + /** + * Gets the current rate in the given time unit from the beginning of the time window to the + * provided point in time ago. + */ + public double get(long toAgo, TimeUnit unit) + { + pruneLock.readLock().lock(); + try + { + long toAgoInMillis = TimeUnit.MILLISECONDS.convert(toAgo, unit); + Preconditions.checkArgument(toAgoInMillis < sizeInMillis, "Cannot get rate in the past!"); + + long now = timeSource.currentTimeMillis(); + long sum = 0; + ConcurrentNavigableMap<Long, AtomicInteger> tailCounters = counters + .tailMap(now - sizeInMillis, true) + .headMap(now - toAgoInMillis, true); + for (AtomicInteger i : tailCounters.values()) + { + sum += i.get(); + } + + double rateInMillis = sum == 0 + ? sum + : sum / (double) Math.max(1000, (now - toAgoInMillis) - tailCounters.firstKey()); + double multiplier = TimeUnit.MILLISECONDS.convert(1, unit); + return rateInMillis * multiplier; + } + finally + { + pruneLock.readLock().unlock(); + } + } + + /** + * Gets the current rate in the given time unit. + */ + public double get(TimeUnit unit) + { + return get(0, unit); + } + + /** + * Prunes the time window of old unused updates. + */ + public void prune() + { + pruneLock.writeLock().lock(); + try + { + long now = timeSource.currentTimeMillis(); + counters.headMap(now - sizeInMillis, false).clear(); + } + finally + { + pruneLock.writeLock().unlock(); + } + } + + @VisibleForTesting + public int size() + { + return counters.values().stream().reduce(new AtomicInteger(), (v1, v2) -> { + v1.addAndGet(v2.get()); + return v1; + }).get(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/SystemTimeSource.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/SystemTimeSource.java b/src/java/org/apache/cassandra/utils/SystemTimeSource.java new file mode 100644 index 0000000..fef525e --- /dev/null +++ b/src/java/org/apache/cassandra/utils/SystemTimeSource.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; + +/** + * Time source backed by JVM clock. + */ +public class SystemTimeSource implements TimeSource +{ + @Override + public long currentTimeMillis() + { + return System.currentTimeMillis(); + } + + @Override + public long nanoTime() + { + return System.nanoTime(); + } + + @Override + public TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit) + { + Uninterruptibles.sleepUninterruptibly(sleepFor, unit); + return this; + } + + @Override + public TimeSource sleep(long sleepFor, TimeUnit unit) throws InterruptedException + { + TimeUnit.NANOSECONDS.sleep(TimeUnit.NANOSECONDS.convert(sleepFor, unit)); + return this; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/TestRateLimiter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/TestRateLimiter.java b/src/java/org/apache/cassandra/utils/TestRateLimiter.java new file mode 100644 index 0000000..a9eb871 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/TestRateLimiter.java @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra.utils; + +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.RateLimiter; + +import org.jboss.byteman.rule.Rule; +import org.jboss.byteman.rule.helper.Helper; + +/** + * Helper class to apply rate limiting during fault injection testing; + * for an example script, see test/resources/byteman/mutation_limiter.btm. + */ +@VisibleForTesting +public class TestRateLimiter extends Helper +{ + private static final AtomicReference<RateLimiter> ref = new AtomicReference<>(); + + protected TestRateLimiter(Rule rule) + { + super(rule); + } + + /** + * Acquires a single unit at the given rate. If the rate changes between calls, a new rate limiter is created + * and the old one is discarded. + */ + public void acquire(double rate) + { + RateLimiter limiter = ref.get(); + if (limiter == null || limiter.getRate() != rate) + { + ref.compareAndSet(limiter, RateLimiter.create(rate)); + limiter = ref.get(); + } + limiter.acquire(1); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/TimeSource.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/TimeSource.java b/src/java/org/apache/cassandra/utils/TimeSource.java new file mode 100644 index 0000000..5d8acec --- /dev/null +++ b/src/java/org/apache/cassandra/utils/TimeSource.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.concurrent.TimeUnit; + +public interface TimeSource +{ + /** + * + * @return the current time in milliseconds + */ + long currentTimeMillis(); + + /** + * + * @return Returns the current time value in nanoseconds. + * + * <p>This method can only be used to measure elapsed time and is + * not related to any other notion of system or wall-clock time. + */ + long nanoTime(); + + /** + * Sleep for the given amount of time uninterruptibly. + * + * @param sleepFor given amout. + * @param unit time unit + * @return The time source itself after the given sleep period. + */ + TimeSource sleepUninterruptibly(long sleepFor, TimeUnit unit); + + /** + * Sleep for the given amount of time. This operation could interrupted. + * Hence after returning from this method, it is not guaranteed + * that the request amount of time has passed. + * + * @param sleepFor given amout. + * @param unit time unit + * @return The time source itself after the given sleep period. + */ + TimeSource sleep(long sleepFor, TimeUnit unit) throws InterruptedException; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java b/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java new file mode 100644 index 0000000..382a2dc --- /dev/null +++ b/src/java/org/apache/cassandra/utils/concurrent/IntervalLock.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.concurrent; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.utils.TimeSource; + +/** + * This class extends ReentrantReadWriteLock to provide a write lock that can only be acquired at provided intervals. + */ +public class IntervalLock extends ReentrantReadWriteLock +{ + private final AtomicLong lastAcquire = new AtomicLong(); + private final TimeSource timeSource; + + public IntervalLock(TimeSource timeSource) + { + this.timeSource = timeSource; + } + + /** + * Try acquiring a write lock if the given interval is passed since the last call to this method. + * + * @param interval In millis. + * @return True if acquired and locked, false otherwise. + */ + public boolean tryIntervalLock(long interval) + { + long now = timeSource.currentTimeMillis(); + boolean acquired = (now - lastAcquire.get() >= interval) && writeLock().tryLock(); + if (acquired) + lastAcquire.set(now); + + return acquired; + } + + /** + * Release the last acquired interval lock. + */ + public void releaseIntervalLock() + { + writeLock().unlock(); + } + + @VisibleForTesting + public long getLastIntervalAcquire() + { + return lastAcquire.get(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/resources/byteman/mutation_limiter.btm ---------------------------------------------------------------------- diff --git a/test/resources/byteman/mutation_limiter.btm b/test/resources/byteman/mutation_limiter.btm new file mode 100644 index 0000000..ca936fc --- /dev/null +++ b/test/resources/byteman/mutation_limiter.btm @@ -0,0 +1,8 @@ +RULE mutation_limiter +CLASS org.apache.cassandra.db.MutationVerbHandler +METHOD doVerb +HELPER org.apache.cassandra.utils.TestRateLimiter +AT ENTRY +IF TRUE +DO acquire(1000.0) +ENDRULE http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index 63c44e0..2dcfbd1 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -101,6 +101,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.io.util.DiskOptimizationStrategy", "org.apache.cassandra.locator.SimpleSeedProvider", "org.apache.cassandra.locator.SeedProvider", + "org.apache.cassandra.net.BackPressureStrategy", "org.apache.cassandra.scheduler.IRequestScheduler", "org.apache.cassandra.security.EncryptionContext", "org.apache.cassandra.service.CacheService$CacheType", http://git-wip-us.apache.org/repos/asf/cassandra/blob/d43b9ce5/test/unit/org/apache/cassandra/net/MessagingServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java index 0e0e4ba..2a3ecbe 100644 --- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java +++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java @@ -24,34 +24,53 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Iterables; + import com.codahale.metrics.Timer; import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; import org.caffinitas.ohc.histo.EstimatedHistogram; + +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.*; - import org.apache.cassandra.config.DatabaseDescriptor; +import static org.junit.Assert.*; + public class MessagingServiceTest { + private final static long ONE_SECOND = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS); + private final static long[] bucketOffsets = new EstimatedHistogram(160).getBucketOffsets(); + private final MessagingService messagingService = MessagingService.test(); + @BeforeClass - public static void initDD() + public static void beforeClass() throws UnknownHostException { DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setBackPressureStrategy(new MockBackPressureStrategy(Collections.emptyMap())); + DatabaseDescriptor.setBroadcastAddress(InetAddress.getByName("127.0.0.1")); } - private final MessagingService messagingService = MessagingService.test(); - private final static long[] bucketOffsets = new EstimatedHistogram(160).getBucketOffsets(); + @Before + public void before() throws UnknownHostException + { + MockBackPressureStrategy.applied = false; + messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.2")); + messagingService.destroyConnectionPool(InetAddress.getByName("127.0.0.3")); + } @Test public void testDroppedMessages() @@ -77,17 +96,6 @@ public class MessagingServiceTest assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString())); } - private static void addDCLatency(long sentAt, long now) throws IOException - { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos)) - { - out.writeInt((int) sentAt); - } - DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray())); - MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now); - } - @Test public void testDCLatency() throws Exception { @@ -123,4 +131,211 @@ public class MessagingServiceTest addDCLatency(sentAt, now); assertNull(dcLatency.get("datacenter1")); } + + @Test + public void testUpdatesBackPressureOnSendWhenEnabledAndWithSupportedCallback() throws UnknownHostException + { + MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState(); + IAsyncCallback bpCallback = new BackPressureCallback(); + IAsyncCallback noCallback = new NoBackPressureCallback(); + MessageOut<?> ignored = null; + + DatabaseDescriptor.setBackPressureEnabled(true); + messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), noCallback, ignored); + assertFalse(backPressureState.onSend); + + DatabaseDescriptor.setBackPressureEnabled(false); + messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored); + assertFalse(backPressureState.onSend); + + DatabaseDescriptor.setBackPressureEnabled(true); + messagingService.updateBackPressureOnSend(InetAddress.getByName("127.0.0.2"), bpCallback, ignored); + assertTrue(backPressureState.onSend); + } + + @Test + public void testUpdatesBackPressureOnReceiveWhenEnabledAndWithSupportedCallback() throws UnknownHostException + { + MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState(); + IAsyncCallback bpCallback = new BackPressureCallback(); + IAsyncCallback noCallback = new NoBackPressureCallback(); + boolean timeout = false; + + DatabaseDescriptor.setBackPressureEnabled(true); + messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout); + assertFalse(backPressureState.onReceive); + assertFalse(backPressureState.onTimeout); + + DatabaseDescriptor.setBackPressureEnabled(false); + messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout); + assertFalse(backPressureState.onReceive); + assertFalse(backPressureState.onTimeout); + + DatabaseDescriptor.setBackPressureEnabled(true); + messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout); + assertTrue(backPressureState.onReceive); + assertFalse(backPressureState.onTimeout); + } + + @Test + public void testUpdatesBackPressureOnTimeoutWhenEnabledAndWithSupportedCallback() throws UnknownHostException + { + MockBackPressureStrategy.MockBackPressureState backPressureState = (MockBackPressureStrategy.MockBackPressureState) messagingService.getConnectionPool(InetAddress.getByName("127.0.0.2")).getBackPressureState(); + IAsyncCallback bpCallback = new BackPressureCallback(); + IAsyncCallback noCallback = new NoBackPressureCallback(); + boolean timeout = true; + + DatabaseDescriptor.setBackPressureEnabled(true); + messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), noCallback, timeout); + assertFalse(backPressureState.onReceive); + assertFalse(backPressureState.onTimeout); + + DatabaseDescriptor.setBackPressureEnabled(false); + messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout); + assertFalse(backPressureState.onReceive); + assertFalse(backPressureState.onTimeout); + + DatabaseDescriptor.setBackPressureEnabled(true); + messagingService.updateBackPressureOnReceive(InetAddress.getByName("127.0.0.2"), bpCallback, timeout); + assertFalse(backPressureState.onReceive); + assertTrue(backPressureState.onTimeout); + } + + @Test + public void testAppliesBackPressureWhenEnabled() throws UnknownHostException + { + DatabaseDescriptor.setBackPressureEnabled(false); + messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND); + assertFalse(MockBackPressureStrategy.applied); + + DatabaseDescriptor.setBackPressureEnabled(true); + messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.2")), ONE_SECOND); + assertTrue(MockBackPressureStrategy.applied); + } + + @Test + public void testDoesntApplyBackPressureToBroadcastAddress() throws UnknownHostException + { + DatabaseDescriptor.setBackPressureEnabled(true); + messagingService.applyBackPressure(Arrays.asList(InetAddress.getByName("127.0.0.1")), ONE_SECOND); + assertFalse(MockBackPressureStrategy.applied); + } + + private static void addDCLatency(long sentAt, long now) throws IOException + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos)) + { + out.writeInt((int) sentAt); + } + DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray())); + MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now); + } + + public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState> + { + public static volatile boolean applied = false; + + public MockBackPressureStrategy(Map<String, Object> args) + { + } + + @Override + public void apply(Set<MockBackPressureState> states, long timeout, TimeUnit unit) + { + if (!Iterables.isEmpty(states)) + applied = true; + } + + @Override + public MockBackPressureState newState(InetAddress host) + { + return new MockBackPressureState(host); + } + + public static class MockBackPressureState implements BackPressureState + { + private final InetAddress host; + public volatile boolean onSend = false; + public volatile boolean onReceive = false; + public volatile boolean onTimeout = false; + + private MockBackPressureState(InetAddress host) + { + this.host = host; + } + + @Override + public void onMessageSent(MessageOut<?> message) + { + onSend = true; + } + + @Override + public void onResponseReceived() + { + onReceive = true; + } + + @Override + public void onResponseTimeout() + { + onTimeout = true; + } + + @Override + public double getBackPressureRateLimit() + { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public InetAddress getHost() + { + return host; + } + } + } + + private static class BackPressureCallback implements IAsyncCallback + { + @Override + public boolean supportsBackPressure() + { + return true; + } + + @Override + public boolean isLatencyForSnitch() + { + return false; + } + + @Override + public void response(MessageIn msg) + { + throw new UnsupportedOperationException("Not supported."); + } + } + + private static class NoBackPressureCallback implements IAsyncCallback + { + @Override + public boolean supportsBackPressure() + { + return false; + } + + @Override + public boolean isLatencyForSnitch() + { + return false; + } + + @Override + public void response(MessageIn msg) + { + throw new UnsupportedOperationException("Not supported."); + } + } }