Author: jbellis
Date: Mon Nov 29 22:28:25 2010
New Revision: 1040325
URL: http://svn.apache.org/viewvc?rev=1040325&view=rev
Log:
clean up SinkManager
patch by jbellis
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1040325&r1=1040324&r2=1040325&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
Mon Nov 29 22:28:25 2010
@@ -33,6 +33,7 @@ public class MessageDeliveryTask impleme
public MessageDeliveryTask(Message message)
{
+ assert message != null;
message_ = message;
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1040325&r1=1040324&r2=1040325&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Mon Nov 29 22:28:25 2010
@@ -305,7 +305,7 @@ public class MessagingService implements
}
// message sinks are a testing hook
- Message processedMessage =
SinkManager.processClientMessageSink(message, to);
+ Message processedMessage = SinkManager.processClientMessage(message,
to);
if (processedMessage == null)
{
return;
@@ -385,11 +385,13 @@ public class MessagingService implements
public static void receive(Message message)
{
- message = SinkManager.processServerMessageSink(message, null);
+ message = SinkManager.processServerMessage(message);
+ if (message == null)
+ return;
Runnable runnable = new MessageDeliveryTask(message);
ExecutorService stage =
StageManager.getStage(message.getMessageType());
- assert stage != null;
+ assert stage != null : "No stage for message type " +
message.getMessageType();
stage.execute(runnable);
}
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=1040325&r1=1040324&r2=1040325&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/sink/SinkManager.java
Mon Nov 29 22:28:25 2010
@@ -18,56 +18,50 @@
package org.apache.cassandra.net.sink;
-import java.util.*;
-import java.io.IOException;
-
import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.cassandra.net.Message;
public class SinkManager
{
- private static LinkedList<IMessageSink> messageSinks_ = new
LinkedList<IMessageSink>();
+ private static List<IMessageSink> sinks = new ArrayList<IMessageSink>();
- public static boolean isInitialized()
+ public static void addSink(IMessageSink ms)
{
- return ( messageSinks_.size() > 0 );
+ sinks.add(ms);
}
- public static void addMessageSink(IMessageSink ms)
+ public static void clear()
{
- messageSinks_.addLast(ms);
- }
-
- public static void clearSinks(){
- messageSinks_.clear();
+ sinks.clear();
}
- public static Message processClientMessageSink(Message message,
InetAddress to)
+ public static Message processClientMessage(Message message, InetAddress to)
{
- ListIterator<IMessageSink> li = messageSinks_.listIterator();
- while ( li.hasNext() )
+ if (sinks.isEmpty())
+ return message;
+
+ for (IMessageSink ms : sinks)
{
- IMessageSink ms = li.next();
message = ms.handleMessage(message, to);
- if ( message == null )
- {
+ if (message == null)
return null;
- }
}
return message;
}
- public static Message processServerMessageSink(Message message,
InetAddress to)
+ public static Message processServerMessage(Message message)
{
- ListIterator<IMessageSink> li =
messageSinks_.listIterator(messageSinks_.size());
- while ( li.hasPrevious() )
+ if (sinks.isEmpty())
+ return message;
+
+ for (IMessageSink ms : sinks)
{
- IMessageSink ms = li.previous();
- message = ms.handleMessage(message, to);
- if ( message == null )
- {
+ message = ms.handleMessage(message, null);
+ if (message == null)
return null;
- }
}
return message;
}
Modified:
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1040325&r1=1040324&r2=1040325&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
(original)
+++
cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/service/RemoveTest.java
Mon Nov 29 22:28:25 2010
@@ -83,7 +83,7 @@ public class RemoveTest extends CleanupH
@After
public void tearDown()
{
- SinkManager.clearSinks();
+ SinkManager.clear();
MessagingService.shutdown();
ss.setPartitionerUnsafe(oldPartitioner);
}
@@ -111,7 +111,7 @@ public class RemoveTest extends CleanupH
final String token =
partitioner.getTokenFactory().toString(endpointTokens.get(5));
ReplicationSink rSink = new ReplicationSink();
- SinkManager.addMessageSink(rSink);
+ SinkManager.addSink(rSink);
// start removal in background and send replication confirmations
final AtomicBoolean success = new AtomicBoolean(false);
@@ -159,8 +159,8 @@ public class RemoveTest extends CleanupH
NotificationSink nSink = new NotificationSink();
ReplicationSink rSink = new ReplicationSink();
- SinkManager.addMessageSink(nSink);
- SinkManager.addMessageSink(rSink);
+ SinkManager.addSink(nSink);
+ SinkManager.addSink(rSink);
assertEquals(0, tmd.getLeavingEndpoints().size());