Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
        CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1c2536c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1c2536c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1c2536c

Branch: refs/heads/cassandra-2.1
Commit: b1c2536ccb244be078cddc0da6c788d2b7c0f4ec
Parents: 7902568 e56d9ef
Author: Joshua McKenzie <jmcken...@apache.org>
Authored: Thu Mar 5 11:32:41 2015 -0600
Committer: Joshua McKenzie <jmcken...@apache.org>
Committed: Thu Mar 5 11:32:41 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   8 ++
 .../org/apache/cassandra/config/Config.java     |   2 +
 .../cassandra/config/DatabaseDescriptor.java    |  19 ++++
 .../apache/cassandra/service/StorageProxy.java  |   8 +-
 .../cassandra/service/StorageProxyMBean.java    |   3 +
 .../transport/ConnectionLimitHandler.java       | 108 +++++++++++++++++++
 .../org/apache/cassandra/transport/Server.java  |   9 ++
 8 files changed, 157 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 137c0f1,4e34c9e..aa2e1f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,36 -1,5 +1,37 @@@
 -2.0.13:
 +2.1.4
 + * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
 + * Remove cold_reads_to_omit from STCS (CASSANDRA-8860)
 + * Make EstimatedHistogram#percentile() use ceil instead of floor 
(CASSANDRA-8883)
 + * Fix top partitions reporting wrong cardinality (CASSANDRA-8834)
 + * Fix rare NPE in KeyCacheSerializer (CASSANDRA-8067)
 + * Pick sstables for validation as late as possible inc repairs 
(CASSANDRA-8366)
 + * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856)
 + * Fix parallelism adjustment in range and secondary index queries
 +   when the first fetch does not satisfy the limit (CASSANDRA-8856)
 + * Check if the filtered sstables is non-empty in STCS (CASSANDRA-8843)
 + * Upgrade java-driver used for cassandra-stress (CASSANDRA-8842)
 + * Fix CommitLog.forceRecycleAllSegments() memory access error 
(CASSANDRA-8812)
 + * Improve assertions in Memory (CASSANDRA-8792)
 + * Fix SSTableRewriter cleanup (CASSANDRA-8802)
 + * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758)
 + * 'nodetool info' prints exception against older node (CASSANDRA-8796)
 + * Ensure SSTableReader.last corresponds exactly with the file end 
(CASSANDRA-8750)
 + * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
 + * Enforce SSTableReader.first/last (CASSANDRA-8744)
 + * Cleanup SegmentedFile API (CASSANDRA-8749)
 + * Avoid overlap with early compaction replacement (CASSANDRA-8683)
 + * Safer Resource Management++ (CASSANDRA-8707)
 + * Write partition size estimates into a system table (CASSANDRA-7688)
 + * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
 +   (CASSANDRA-8154)
 + * Show progress of streaming in nodetool netstats (CASSANDRA-8886)
 + * IndexSummaryBuilder utilises offheap memory, and shares data between
 +   each IndexSummary opened from it (CASSANDRA-8757)
 + * markCompacting only succeeds if the exact SSTableReader instances being 
 +   marked are in the live set (CASSANDRA-8689)
 + * cassandra-stress support for varint (CASSANDRA-8882)
 +Merged from 2.0:
+  * Add ability to limit number of native connections (CASSANDRA-8086)
   * Add offline tool to relevel sstables (CASSANDRA-8301)
   * Preserve stream ID for more protocol errors (CASSANDRA-8848)
   * Fix combining token() function with multi-column relations on

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/conf/cassandra.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/service/StorageProxyMBean.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
index 0000000,c45d2cb..7bcf280
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
+++ b/src/java/org/apache/cassandra/transport/ConnectionLimitHandler.java
@@@ -1,0 -1,111 +1,108 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.transport;
+ 
+ 
++import io.netty.channel.ChannelHandler;
++import io.netty.channel.ChannelHandlerContext;
++import io.netty.channel.ChannelInboundHandlerAdapter;
+ import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.jboss.netty.channel.ChannelHandler;
 -import org.jboss.netty.channel.ChannelHandlerContext;
 -import org.jboss.netty.channel.ChannelStateEvent;
 -import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import java.net.InetAddress;
+ import java.net.InetSocketAddress;
 -import java.util.HashMap;
 -import java.util.Map;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
+ 
+ /**
 - * {@link SimpleChannelUpstreamHandler} implementation which allows to limit 
the number of concurrent
++ * {@link ChannelInboundHandlerAdapter} implementation which allows to limit 
the number of concurrent
+  * connections to the Server. Be aware this <strong>MUST</strong> be shared 
between all child channels.
+  */
+ @ChannelHandler.Sharable
 -final class ConnectionLimitHandler extends SimpleChannelUpstreamHandler
++final class ConnectionLimitHandler extends ChannelInboundHandlerAdapter
+ {
+     private static final Logger logger = 
LoggerFactory.getLogger(ConnectionLimitHandler.class);
+     private final ConcurrentMap<InetAddress, AtomicLong> connectionsPerClient 
= new ConcurrentHashMap<>();
+     private final AtomicLong counter = new AtomicLong(0);
+ 
+     @Override
 -    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent 
event) throws Exception
++    public void channelActive(ChannelHandlerContext ctx) throws Exception
+     {
+         final long count = counter.incrementAndGet();
+         long limit = 
DatabaseDescriptor.getNativeTransportMaxConcurrentConnections();
+         // Setting the limit to -1 disables it.
+         if(limit < 0)
+         {
+             limit = Long.MAX_VALUE;
+         }
+         if (count > limit)
+         {
+             // The decrement will be done in channelClosed(...)
+             logger.warn("Exceeded maximum native connection limit of {} by 
using {} connections", limit, count);
 -            ctx.getChannel().close();
++            ctx.close();
+         }
+         else
+         {
+             long perIpLimit = 
DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp();
+             if (perIpLimit > 0)
+             {
 -                InetAddress address = ((InetSocketAddress) 
ctx.getChannel().getRemoteAddress()).getAddress();
++                InetAddress address = ((InetSocketAddress) 
ctx.channel().remoteAddress()).getAddress();
+ 
+                 AtomicLong perIpCount = connectionsPerClient.get(address);
+                 if (perIpCount == null)
+                 {
+                     perIpCount = new AtomicLong(0);
+ 
+                     AtomicLong old = 
connectionsPerClient.putIfAbsent(address, perIpCount);
+                     if (old != null)
+                     {
+                         perIpCount = old;
+                     }
+                 }
+                 if (perIpCount.incrementAndGet() > perIpLimit)
+                 {
+                     // The decrement will be done in channelClosed(...)
+                     logger.warn("Exceeded maximum native connection limit per 
ip of {} by using {} connections", perIpLimit, perIpCount);
 -                    ctx.getChannel().close();
++                    ctx.close();
+                     return;
+                 }
+             }
 -            super.channelOpen(ctx, event);
++            ctx.fireChannelActive();
+         }
+     }
+ 
+     @Override
 -    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent 
event) throws Exception
++    public void channelInactive(ChannelHandlerContext ctx) throws Exception
+     {
+         counter.decrementAndGet();
 -        InetAddress address = ((InetSocketAddress) 
ctx.getChannel().getRemoteAddress()).getAddress();
++        InetAddress address = ((InetSocketAddress) 
ctx.channel().remoteAddress()).getAddress();
+ 
+         AtomicLong count = connectionsPerClient.get(address);
+         if (count != null)
+         {
+             if (count.decrementAndGet() <= 0)
+             {
+                 connectionsPerClient.remove(address);
+             }
+         }
 -        super.channelClosed(ctx, event);
++        ctx.fireChannelInactive();
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1c2536c/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/Server.java
index 60d3e70,30b8a9d..f396fd9
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@@ -278,10 -249,18 +279,18 @@@ public class Server implements Cassandr
              this.server = server;
          }
  
 -        public ChannelPipeline getPipeline() throws Exception
 +        protected void initChannel(Channel channel) throws Exception
          {
 -            ChannelPipeline pipeline = Channels.pipeline();
 +            ChannelPipeline pipeline = channel.pipeline();
  
+             // Add the ConnectionLimitHandler to the pipeline if configured 
to do so.
+             if 
(DatabaseDescriptor.getNativeTransportMaxConcurrentConnections() > 0
+                     || 
DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp() > 0)
+             {
+                 // Add as first to the pipeline so the limit is enforced as 
first action.
+                 pipeline.addFirst("connectionLimitHandler", 
connectionLimitHandler);
+             }
+ 
              //pipeline.addLast("debug", new LoggingHandler());
  
              pipeline.addLast("frameDecoder", new 
Frame.Decoder(server.connectionFactory));

Reply via email to