Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a0e97df
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a0e97df
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a0e97df

Branch: refs/heads/trunk
Commit: 0a0e97df548db21ba8ed4469244c878b4da3ed93
Parents: 863dbc7 142f358
Author: Sam Tunnicliffe <[email protected]>
Authored: Mon Jun 20 14:05:13 2016 +0100
Committer: Sam Tunnicliffe <[email protected]>
Committed: Mon Jun 20 14:11:34 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/service/StorageService.java       | 137 +++----------------
 .../org/apache/cassandra/transport/Server.java  |  22 ++-
 3 files changed, 37 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a0e97df/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 4c0d9a0,76e601c..7873742
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,31 -1,6 +1,32 @@@
 -2.2.7
 +3.0.8
 + * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
 +Merged from 2.2:
+  * Don't send erroneous NEW_NODE notifications on restart (CASSANDRA-11038)
   * StorageService shutdown hook should use a volatile variable 
(CASSANDRA-11984)
 +Merged from 2.1:
 + * Cache local ranges when calculating repair neighbors (CASSANDRA-11934)
 + * Allow LWT operation on static column with only partition keys 
(CASSANDRA-10532)
 + * Create interval tree over canonical sstables to avoid missing sstables 
during streaming (CASSANDRA-11886)
 + * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid 
corrupting SSL connections (CASSANDRA-11749)
 +
 +
 +3.0.7
 + * Fix legacy serialization of Thrift-generated non-compound range tombstones
 +   when communicating with 2.x nodes (CASSANDRA-11930)
 + * Fix Directories instantiations where CFS.initialDirectories should be used 
(CASSANDRA-11849)
 + * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
 + * Fix sstables not being protected from removal during index build 
(CASSANDRA-11905)
 + * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
 + * Remove unneeded code to repair index summaries that have
 +   been improperly down-sampled (CASSANDRA-11127)
 + * Avoid WriteTimeoutExceptions during commit log replay due to materialized
 +   view lock contention (CASSANDRA-11891)
 + * Prevent OOM failures on SSTable corruption, improve tests for corruption 
detection (CASSANDRA-9530)
 + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
 + * Allow compaction strategies to disable early open (CASSANDRA-11754)
 + * Refactor Materialized View code (CASSANDRA-11475)
 + * Update Java Driver (CASSANDRA-11615)
 +Merged from 2.2:
   * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
   * Run CommitLog tests with different compression settings (CASSANDRA-9039)
   * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a0e97df/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 5167151,a877074..394220d
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -26,37 -22,12 +22,13 @@@ import java.lang.management.ManagementF
  import java.net.InetAddress;
  import java.net.UnknownHostException;
  import java.nio.ByteBuffer;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.EnumMap;
- import java.util.HashMap;
- import java.util.HashSet;
- import java.util.Iterator;
- import java.util.LinkedHashMap;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
+ import java.util.*;
  import java.util.Map.Entry;
- import java.util.Set;
- import java.util.SortedMap;
- import java.util.TreeMap;
- import java.util.UUID;
- import java.util.concurrent.CopyOnWriteArrayList;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Future;
- import java.util.concurrent.FutureTask;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
+ import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicBoolean;
  import java.util.concurrent.atomic.AtomicInteger;
 +import javax.annotation.Nullable;
- import javax.management.JMX;
- import javax.management.MBeanServer;
- import javax.management.NotificationBroadcasterSupport;
- import javax.management.ObjectName;
+ import javax.management.*;
  import javax.management.openmbean.TabularData;
  import javax.management.openmbean.TabularDataSupport;
  
@@@ -91,71 -48,22 +52,28 @@@ import org.apache.cassandra.batchlog.Ba
  import org.apache.cassandra.concurrent.ScheduledExecutors;
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
 -import org.apache.cassandra.config.*;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.Schema;
- import org.apache.cassandra.db.ColumnFamilyStore;
- import org.apache.cassandra.db.CounterMutationVerbHandler;
- import org.apache.cassandra.db.DecoratedKey;
- import org.apache.cassandra.db.DefinitionsUpdateVerbHandler;
- import org.apache.cassandra.db.Keyspace;
- import org.apache.cassandra.db.MigrationRequestVerbHandler;
- import org.apache.cassandra.db.MutationVerbHandler;
- import org.apache.cassandra.db.RangeSliceVerbHandler;
- import org.apache.cassandra.db.ReadCommandVerbHandler;
- import org.apache.cassandra.db.ReadRepairVerbHandler;
- import org.apache.cassandra.db.SchemaCheckVerbHandler;
- import org.apache.cassandra.db.SizeEstimatesRecorder;
- import org.apache.cassandra.db.SnapshotDetailsTabularData;
- import org.apache.cassandra.db.SystemKeyspace;
- import org.apache.cassandra.db.TruncateVerbHandler;
+ import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
- import org.apache.cassandra.dht.BootStrapper;
- import org.apache.cassandra.dht.IPartitioner;
+ import org.apache.cassandra.dht.*;
  import org.apache.cassandra.dht.Range;
- import org.apache.cassandra.dht.RangeStreamer;
- import org.apache.cassandra.dht.RingPosition;
- import org.apache.cassandra.dht.StreamStateStore;
- import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.dht.Token.TokenFactory;
- import org.apache.cassandra.exceptions.AlreadyExistsException;
- import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.exceptions.InvalidRequestException;
- import org.apache.cassandra.exceptions.UnavailableException;
- import org.apache.cassandra.gms.ApplicationState;
- import org.apache.cassandra.gms.EndpointState;
- import org.apache.cassandra.gms.FailureDetector;
- import org.apache.cassandra.gms.GossipDigestAck2VerbHandler;
- import org.apache.cassandra.gms.GossipDigestAckVerbHandler;
- import org.apache.cassandra.gms.GossipDigestSynVerbHandler;
- import org.apache.cassandra.gms.GossipShutdownVerbHandler;
- import org.apache.cassandra.gms.Gossiper;
- import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
- import org.apache.cassandra.gms.IFailureDetector;
- import org.apache.cassandra.gms.TokenSerializer;
- import org.apache.cassandra.gms.VersionedValue;
+ import org.apache.cassandra.exceptions.*;
+ import org.apache.cassandra.gms.*;
 -import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 +import org.apache.cassandra.hints.HintVerbHandler;
 +import org.apache.cassandra.hints.HintsService;
  import org.apache.cassandra.io.sstable.SSTableLoader;
  import org.apache.cassandra.io.util.FileUtils;
- import org.apache.cassandra.locator.AbstractReplicationStrategy;
- import org.apache.cassandra.locator.DynamicEndpointSnitch;
- import org.apache.cassandra.locator.IEndpointSnitch;
- import org.apache.cassandra.locator.LocalStrategy;
- import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.locator.*;
  import org.apache.cassandra.metrics.StorageMetrics;
- import org.apache.cassandra.net.AsyncOneResponse;
- import org.apache.cassandra.net.MessageOut;
- import org.apache.cassandra.net.MessagingService;
- import org.apache.cassandra.net.ResponseVerbHandler;
- import org.apache.cassandra.repair.RepairMessageVerbHandler;
- import org.apache.cassandra.repair.RepairParallelism;
- import org.apache.cassandra.repair.RepairRunnable;
- import org.apache.cassandra.repair.SystemDistributedKeyspace;
+ import org.apache.cassandra.net.*;
+ import org.apache.cassandra.repair.*;
  import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.schema.KeyspaceMetadata;
  import org.apache.cassandra.service.paxos.CommitVerbHandler;
  import org.apache.cassandra.service.paxos.PrepareVerbHandler;
  import org.apache.cassandra.service.paxos.ProposeVerbHandler;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a0e97df/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/transport/Server.java
index 76aedb7,5c0d9d2..63194d0
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@@ -21,9 -21,8 +21,7 @@@ import java.io.IOException
  import java.net.InetAddress;
  import java.net.InetSocketAddress;
  import java.net.UnknownHostException;
- import java.util.EnumMap;
- import java.util.List;
- import java.util.Map;
+ import java.util.*;
 -import java.util.concurrent.Callable;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.atomic.AtomicBoolean;
  import javax.net.ssl.SSLContext;
@@@ -35,9 -34,9 +33,8 @@@ import org.slf4j.LoggerFactory
  import io.netty.bootstrap.ServerBootstrap;
  import io.netty.buffer.ByteBuf;
  import io.netty.channel.*;
 -import io.netty.channel.epoll.Epoll;
  import io.netty.channel.epoll.EpollEventLoopGroup;
  import io.netty.channel.epoll.EpollServerSocketChannel;
- import io.netty.handler.codec.ByteToMessageDecoder;
  import io.netty.channel.group.ChannelGroup;
  import io.netty.channel.group.DefaultChannelGroup;
  import io.netty.channel.nio.NioEventLoopGroup;
@@@ -458,9 -429,14 +456,13 @@@ public class Server implements Cassandr
      {
          private final Server server;
  
-         // We keep track of the latest events we have sent to avoid sending 
duplicates
-         // since StorageService may send duplicate notifications 
(CASSANDRA-7816, CASSANDRA-8236)
+         // We keep track of the latest status change events we have sent to 
avoid sending duplicates
+         // since StorageService may send duplicate notifications 
(CASSANDRA-7816, CASSANDRA-8236, CASSANDRA-9156)
          private final Map<InetAddress, LatestEvent> latestEvents = new 
ConcurrentHashMap<>();
+         // We also want to delay delivering a NEW_NODE notification until the 
new node has set its RPC ready
+         // state. This tracks the endpoints which have joined, but not yet 
signalled they're ready for clients
 -        private final Set<InetAddress> endpointsPendingJoinedNotification =
 -            Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, 
Boolean>());
++        private final Set<InetAddress> endpointsPendingJoinedNotification = 
ConcurrentHashMap.newKeySet();
+ 
  
          private static final InetAddress bindAll;
          static {
@@@ -538,6 -517,9 +543,9 @@@
  
          public void onUp(InetAddress endpoint)
          {
+             if (endpointsPendingJoinedNotification.remove(endpoint))
+                 onJoinCluster(endpoint);
 -            
++
              onStatusChange(endpoint, 
Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort()));
          }
  

Reply via email to