Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1c2536c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1c2536c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1c2536c Branch: refs/heads/cassandra-2.1 Commit: b1c2536ccb244be078cddc0da6c788d2b7c0f4ec Parents: 7902568 e56d9ef Author: Joshua McKenzie <jmcken...@apache.org> Authored: Thu Mar 5 11:32:41 2015 -0600 Committer: Joshua McKenzie <jmcken...@apache.org> Committed: Thu Mar 5 11:32:41 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 8 ++ .../org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 19 ++++ .../apache/cassandra/service/StorageProxy.java | 8 +- .../cassandra/service/StorageProxyMBean.java | 3 + .../transport/ConnectionLimitHandler.java | 108 +++++++++++++++++++ .../org/apache/cassandra/transport/Server.java | 9 ++ 8 files changed, 157 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 137c0f1,4e34c9e..aa2e1f9 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,36 -1,5 +1,37 @@@ -2.0.13: +2.1.4 + * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832) + * Remove cold_reads_to_omit from STCS (CASSANDRA-8860) + * Make EstimatedHistogram#percentile() use ceil instead of floor (CASSANDRA-8883) + * Fix top partitions reporting wrong cardinality (CASSANDRA-8834) + * Fix rare NPE in KeyCacheSerializer (CASSANDRA-8067) + * Pick sstables for validation as late as possible inc repairs (CASSANDRA-8366) + * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856) + * Fix parallelism adjustment in range and secondary index queries + when the first fetch does not satisfy the limit (CASSANDRA-8856) + * Check if the filtered sstables is non-empty in STCS (CASSANDRA-8843) + * Upgrade java-driver used for cassandra-stress (CASSANDRA-8842) + * Fix CommitLog.forceRecycleAllSegments() memory access error (CASSANDRA-8812) + * Improve assertions in Memory (CASSANDRA-8792) + * Fix SSTableRewriter cleanup (CASSANDRA-8802) + * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758) + * 'nodetool info' prints exception against older node (CASSANDRA-8796) + * Ensure SSTableReader.last corresponds exactly with the file end (CASSANDRA-8750) + * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747) + * Enforce SSTableReader.first/last (CASSANDRA-8744) + * Cleanup SegmentedFile API (CASSANDRA-8749) + * Avoid overlap with early compaction replacement (CASSANDRA-8683) + * Safer Resource Management++ (CASSANDRA-8707) + * Write partition size estimates into a system table (CASSANDRA-7688) + * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output + (CASSANDRA-8154) + * Show progress of streaming in nodetool netstats (CASSANDRA-8886) + * IndexSummaryBuilder utilises offheap memory, and shares data between + each IndexSummary opened from it (CASSANDRA-8757) + * markCompacting only succeeds if the exact SSTableReader instances being + marked are in the live set (CASSANDRA-8689) + * cassandra-stress support for varint (CASSANDRA-8882) +Merged from 2.0: + * Add ability to limit number of native connections (CASSANDRA-8086) * Add offline tool to relevel sstables (CASSANDRA-8301) * Preserve stream ID for more protocol errors (CASSANDRA-8848) * Fix combining token() function with multi-column relations on http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/conf/cassandra.yaml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/service/StorageProxyMBean.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java index 0000000,c45d2cb..7bcf280 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java +++ b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java @@@ -1,0 -1,111 +1,108 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.transport; + + ++import io.netty.channel.ChannelHandler; ++import io.netty.channel.ChannelHandlerContext; ++import io.netty.channel.ChannelInboundHandlerAdapter; + import org.apache.cassandra.config.DatabaseDescriptor; -import org.jboss.netty.channel.ChannelHandler; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import java.net.InetAddress; + import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; + import java.util.concurrent.atomic.AtomicLong; + + + /** - * {@link SimpleChannelUpstreamHandler} implementation which allows to limit the number of concurrent ++ * {@link ChannelInboundHandlerAdapter} implementation which allows to limit the number of concurrent + * connections to the Server. Be aware this <strong>MUST</strong> be shared between all child channels. + */ + @ChannelHandler.Sharable -final class ConnectionLimitHandler extends SimpleChannelUpstreamHandler ++final class ConnectionLimitHandler extends ChannelInboundHandlerAdapter + { + private static final Logger logger = LoggerFactory.getLogger(ConnectionLimitHandler.class); + private final ConcurrentMap<InetAddress, AtomicLong> connectionsPerClient = new ConcurrentHashMap<>(); + private final AtomicLong counter = new AtomicLong(0); + + @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception ++ public void channelActive(ChannelHandlerContext ctx) throws Exception + { + final long count = counter.incrementAndGet(); + long limit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnections(); + // Setting the limit to -1 disables it. + if(limit < 0) + { + limit = Long.MAX_VALUE; + } + if (count > limit) + { + // The decrement will be done in channelClosed(...) + logger.warn("Exceeded maximum native connection limit of {} by using {} connections", limit, count); - ctx.getChannel().close(); ++ ctx.close(); + } + else + { + long perIpLimit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp(); + if (perIpLimit > 0) + { - InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress(); ++ InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress(); + + AtomicLong perIpCount = connectionsPerClient.get(address); + if (perIpCount == null) + { + perIpCount = new AtomicLong(0); + + AtomicLong old = connectionsPerClient.putIfAbsent(address, perIpCount); + if (old != null) + { + perIpCount = old; + } + } + if (perIpCount.incrementAndGet() > perIpLimit) + { + // The decrement will be done in channelClosed(...) + logger.warn("Exceeded maximum native connection limit per ip of {} by using {} connections", perIpLimit, perIpCount); - ctx.getChannel().close(); ++ ctx.close(); + return; + } + } - super.channelOpen(ctx, event); ++ ctx.fireChannelActive(); + } + } + + @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception ++ public void channelInactive(ChannelHandlerContext ctx) throws Exception + { + counter.decrementAndGet(); - InetAddress address = ((InetSocketAddress) ctx.getChannel().getRemoteAddress()).getAddress(); ++ InetAddress address = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress(); + + AtomicLong count = connectionsPerClient.get(address); + if (count != null) + { + if (count.decrementAndGet() <= 0) + { + connectionsPerClient.remove(address); + } + } - super.channelClosed(ctx, event); ++ ctx.fireChannelInactive(); + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/transport/Server.java index 60d3e70,30b8a9d..f396fd9 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@@ -278,10 -249,18 +279,18 @@@ public class Server implements Cassandr this.server = server; } - public ChannelPipeline getPipeline() throws Exception + protected void initChannel(Channel channel) throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); + ChannelPipeline pipeline = channel.pipeline(); + // Add the ConnectionLimitHandler to the pipeline if configured to do so. + if (DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0 + || DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0) + { + // Add as first to the pipeline so the limit is enforced as first action. + pipeline.addFirst("connectionLimitHandler", connectionLimitHandler); + } + //pipeline.addLast("debug", new LoggingHandler()); pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory));