Author: jbellis
Date: Tue Dec 20 20:39:50 2011
New Revision: 1221482
URL: http://svn.apache.org/viewvc?rev=1221482&view=rev
Log:
merge #3335 from 1.0
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 20 20:39:50 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1198724,1198726-1206097,1206099-1212854,1212938,1214916
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
-/cassandra/branches/cassandra-1.0:1167085-1221019
+/cassandra/branches/cassandra-1.0:1167085-1221481
/cassandra/branches/cassandra-1.0.0:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/branches/cassandra-1.0.5:1208016
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1221482&r1=1221481&r2=1221482&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Dec 20 20:39:50 2011
@@ -34,6 +34,8 @@
* more efficient allocation of small bloom filters (CASSANDRA-3618)
* CLibrary.createHardLinkWithExec() to check for errors (CASSANDRA-3101)
* Avoid creating empty and non cleaned writer during compaction
(CASSANDRA-3616)
+ * stop thrift service in shutdown hook so we can quiesce MessagingService
+ (CASSANDRA-3335)
Merged from 0.8:
* prevent new nodes from thinking down nodes are up forever (CASSANDRA-3626)
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 20 20:39:50 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1198724,1198726-1206097,1206099-1212854,1212938,1214916
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
-/cassandra/branches/cassandra-1.0/contrib:1167085-1221019
+/cassandra/branches/cassandra-1.0/contrib:1167085-1221481
/cassandra/branches/cassandra-1.0.0/contrib:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/branches/cassandra-1.0.5/contrib:1208016
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 20 20:39:50 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1198724,1198726-1206097,1206099-1212854,1212938,1214916
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1221019
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1221481
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/branches/cassandra-1.0.5/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1208016
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 20 20:39:50 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1198724,1198726-1206097,1206099-1212854,1212938,1214916
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1221019
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1221481
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/branches/cassandra-1.0.5/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1208016
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 20 20:39:50 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1198724,1198726-1206097,1206099-1212854,1212938,1214916
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1221019
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1221481
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/branches/cassandra-1.0.5/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1208016
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 20 20:39:50 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1198724,1198726-1206097,1206099-1212854,1212938,1214916
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1221019
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1221481
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/branches/cassandra-1.0.5/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1208016
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 20 20:39:50 2011
@@ -4,7 +4,7 @@
/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1198724,1198726-1206097,1206099-1212854,1212938,1214916
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
-/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1221019
+/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1221481
/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1167229,1167232-1181093,1181741,1181816,1181820,1182951,1183243
/cassandra/branches/cassandra-1.0.5/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1208016
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
Modified:
cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1221482&r1=1221481&r2=1221482&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue
Dec 20 20:39:50 2011
@@ -28,9 +28,7 @@ import java.nio.channels.ServerSocketCha
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@@ -555,38 +553,20 @@ public final class MessagingService impl
}
}
- public void shutdown()
+ /**
+ * There isn't a good way to shut down the MessagingService. One problem
(but not the only one)
+ * is that StorageProxy has no way to communicate back to clients, "I'm
nominally alive, but I can't
+ * send that request to the nodes with your data." Neither TimedOut nor
Unavailable is appropriate
+ * to return in that situation.
+ *
+ * So instead of shutting down MS and letting StorageProxy/clients cope
somehow, we shut down
+ * the Thrift service and then wait for all the outstanding requests to
finish or timeout.
+ */
+ public void waitForCallbacks()
{
- logger_.info("Shutting down MessageService...");
+ logger_.info("Waiting for messaging service to quiesce");
// We may need to schedule hints on the mutation stage, so it's
erroneous to shut down the mutation stage first
assert !StageManager.getStage(Stage.MUTATION).isShutdown();
-
- try
- {
- for (SocketThread th : socketThreads)
- th.close();
- }
- catch (IOException e)
- {
- throw new IOError(e);
- }
-
- streamExecutorsLock.lock();
- try
- {
- for (DebuggableThreadPoolExecutor e : streamExecutors.values())
- {
- e.shutdown();
- }
- }
- finally
- {
- streamExecutorsLock.unlock();
- }
-
- callbacks.shutdown();
-
- logger_.info("Waiting for in-progress requests to complete");
callbacks.shutdown();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1221482&r1=1221481&r2=1221482&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Dec 20 20:39:50 2011
@@ -347,7 +347,7 @@ public class StorageService implements I
Gossiper.instance.unregister(migrationManager);
Gossiper.instance.unregister(this);
Gossiper.instance.stop();
- MessagingService.instance().shutdown();
+ MessagingService.instance().waitForCallbacks();
// give it a second so that task accepted before the MessagingService
shutdown gets submitted to the stage (to avoid RejectedExecutionException)
try { Thread.sleep(1000L); } catch (InterruptedException e) {}
StageManager.shutdownNow();
@@ -443,13 +443,15 @@ public class StorageService implements I
if (mutationStage.isShutdown())
return; // drained already
+ stopRPCServer();
optionalTasks.shutdown();
Gossiper.instance.stop();
- MessagingService.instance().shutdown();
+ // In-progress writes originating here could generate hints to
be written, so shut down MessagingService
+ // before mutation stage, so we can get all the hints saved
before shutting down
+ MessagingService.instance().waitForCallbacks();
mutationStage.shutdown();
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
-
StorageProxy.instance.verifyNoHintsInProgress();
List<Future<?>> flushes = new ArrayList<Future<?>>();
@@ -2116,7 +2118,7 @@ public class StorageService implements I
public void run()
{
Gossiper.instance.stop();
- MessagingService.instance().shutdown();
+ MessagingService.instance().waitForCallbacks();
StageManager.shutdownNow();
setMode(Mode.DECOMMISSIONED, true);
// let op be responsible for killing the process
@@ -2508,10 +2510,12 @@ public class StorageService implements I
return;
}
setMode(Mode.DRAINING, "starting drain process", true);
+ stopRPCServer();
optionalTasks.shutdown();
Gossiper.instance.stop();
+
setMode(Mode.DRAINING, "shutting down MessageService", false);
- MessagingService.instance().shutdown();
+ MessagingService.instance().waitForCallbacks();
setMode(Mode.DRAINING, "waiting for streaming", false);
MessagingService.instance().waitForStreaming();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java?rev=1221482&r1=1221481&r2=1221482&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Tue Dec 20 20:39:50 2011
@@ -19,21 +19,21 @@
package org.apache.cassandra.thrift;
+import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.thrift.server.TThreadPoolServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
-import org.apache.thrift.transport.*;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
/**
@@ -96,7 +96,6 @@ public class CustomTThreadPoolServer ext
}
}
- int failureCount = 0;
try
{
TTransport client = serverTransport_.accept();
@@ -106,9 +105,11 @@ public class CustomTThreadPoolServer ext
}
catch (TTransportException ttx)
{
+ if (ttx.getCause() instanceof SocketTimeoutException) //
thrift sucks
+ continue;
+
if (!stopped_)
{
- ++failureCount;
LOGGER.warn("Transport error occurred during acceptance of
message.", ttx);
}
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java?rev=1221482&r1=1221481&r2=1221482&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/thrift/TCustomServerSocket.java
Tue Dec 20 20:39:50 2011
@@ -149,7 +149,7 @@ public class TCustomServerSocket extends
{
try
{
- serverSocket_.setSoTimeout(0);
+ serverSocket_.setSoTimeout(100);
}
catch (SocketException sx)
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1221482&r1=1221481&r2=1221482&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Tue
Dec 20 20:39:50 2011
@@ -22,14 +22,19 @@ import java.util.*;
import com.google.common.base.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.cliffc.high_scale_lib.NonBlockingHashMap;
public class ExpiringMap<K, V>
{
+ private static final Logger logger =
LoggerFactory.getLogger(ExpiringMap.class);
+
private static class CacheableObject<T>
{
private final T value;
- private final long age;
+ private final long createdAt;
private final long expiration;
CacheableObject(T o, long e)
@@ -37,7 +42,7 @@ public class ExpiringMap<K, V>
assert o != null;
value = o;
expiration = e;
- age = System.currentTimeMillis();
+ createdAt = System.currentTimeMillis();
}
T getValue()
@@ -45,31 +50,31 @@ public class ExpiringMap<K, V>
return value;
}
- boolean isReadyToDie(long start)
+ boolean isReadyToDieAt(long time)
{
- return ((start - age) > expiration);
+ return ((time - createdAt) > expiration);
}
}
private final NonBlockingHashMap<K, CacheableObject<V>> cache = new
NonBlockingHashMap<K, CacheableObject<V>>();
private final Timer timer;
private static int counter = 0;
- private final long expiration;
+ private final long defaultExpiration;
- public ExpiringMap(long expiration)
+ public ExpiringMap(long defaultExpiration)
{
- this(expiration, null);
+ this(defaultExpiration, null);
}
/**
*
- * @param expiration the TTL for objects in the cache in milliseconds
+ * @param defaultExpiration the TTL for objects in the cache in
milliseconds
*/
- public ExpiringMap(long expiration, final Function<Pair<K,V>, ?>
postExpireHook)
+ public ExpiringMap(long defaultExpiration, final Function<Pair<K,V>, ?>
postExpireHook)
{
- this.expiration = expiration;
+ this.defaultExpiration = defaultExpiration;
- if (expiration <= 0)
+ if (defaultExpiration <= 0)
{
throw new IllegalArgumentException("Argument specified must be a
positive number");
}
@@ -80,24 +85,28 @@ public class ExpiringMap<K, V>
public void run()
{
long start = System.currentTimeMillis();
+ int n = 0;
for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
{
- if (entry.getValue().isReadyToDie(start))
+ if (entry.getValue().isReadyToDieAt(start))
{
cache.remove(entry.getKey());
+ n++;
if (postExpireHook != null)
postExpireHook.apply(new Pair<K,
V>(entry.getKey(), entry.getValue().getValue()));
}
}
+ logger.trace("Expired {} entries", n);
}
};
- timer.schedule(task, expiration / 2, expiration / 2);
+ timer.schedule(task, defaultExpiration / 2, defaultExpiration / 2);
}
public void shutdown()
{
while (!cache.isEmpty())
{
+ logger.trace("Waiting for {} entries before shutting down
ExpiringMap", cache.size());
try
{
Thread.sleep(100);
@@ -117,7 +126,7 @@ public class ExpiringMap<K, V>
public V put(K key, V value)
{
- return put(key, value, this.expiration);
+ return put(key, value, this.defaultExpiration);
}
public V put(K key, V value, long timeout)
@@ -141,7 +150,7 @@ public class ExpiringMap<K, V>
public long getAge(K key)
{
CacheableObject<V> co = cache.get(key);
- return co == null ? 0 : co.age;
+ return co == null ? 0 : co.createdAt;
}
public int size()
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1221482&r1=1221481&r2=1221482&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java Tue
Dec 20 20:39:50 2011
@@ -31,14 +31,11 @@ import org.junit.Test;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
-import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
@@ -88,7 +85,7 @@ public class RemoveTest extends CleanupH
{
SinkManager.clear();
MessagingService.instance().clearCallbacksUnsafe();
- MessagingService.instance().shutdown();
+ MessagingService.instance().waitForCallbacks();
ss.setPartitionerUnsafe(oldPartitioner);
}