Updated Branches: refs/heads/cassandra-2.0 18260c5f2 -> 6b3fe5ee7
Add IRequestSink interface patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for CASSANDRA-6248 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6b3fe5ee Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6b3fe5ee Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6b3fe5ee Branch: refs/heads/cassandra-2.0 Commit: 6b3fe5ee7ca333bf1d8cef1cb06e72f2b7ccef6a Parents: 18260c5 Author: Aleksey Yeschenko <[email protected]> Authored: Sun Oct 27 23:28:46 2013 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Sun Oct 27 23:28:46 2013 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/net/MessagingService.java | 20 +++- .../apache/cassandra/net/sink/IMessageSink.java | 42 -------- .../apache/cassandra/net/sink/SinkManager.java | 68 ------------- .../apache/cassandra/service/StorageProxy.java | 17 +++- .../org/apache/cassandra/sink/IMessageSink.java | 42 ++++++++ .../org/apache/cassandra/sink/IRequestSink.java | 32 ++++++ .../org/apache/cassandra/sink/SinkManager.java | 100 +++++++++++++++++++ .../cassandra/repair/DifferencerTest.java | 4 +- .../apache/cassandra/repair/ValidatorTest.java | 4 +- .../apache/cassandra/service/RemoveTest.java | 2 +- 11 files changed, 212 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 62c3f52..1052901 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,7 @@ 2.0.3 * Fix modifying column_metadata from thrift (CASSANDRA-6182) * cqlsh: fix LIST USERS output (CASSANDRA-6242) + * Add IRequestSink interface (CASSANDRA-6248) 2.0.2 http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index b66c8a4..6696e87 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -53,7 +53,7 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.locator.ILatencySubscriber; import org.apache.cassandra.metrics.ConnectionMetrics; import org.apache.cassandra.metrics.DroppedMessageMetrics; -import org.apache.cassandra.net.sink.SinkManager; +import org.apache.cassandra.sink.SinkManager; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.*; @@ -708,9 +708,13 @@ public final class MessagingService implements MessagingServiceMBean if (state != null) state.trace("Message received from {}", message.from); + Verb verb = message.verb; message = SinkManager.processInboundMessage(message, id); if (message == null) + { + incrementRejectedMessages(verb); return; + } Runnable runnable = new MessageDeliveryTask(message, id, timestamp); TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType()); @@ -798,6 +802,20 @@ public final class MessagingService implements MessagingServiceMBean droppedMessages.get(verb).dropped.mark(); } + /** + * Same as incrementDroppedMessages(), but allows non-droppable verbs. Called for IMessageSink-caused message drops. + */ + private void incrementRejectedMessages(Verb verb) + { + DroppedMessageMetrics metrics = droppedMessages.get(verb); + if (metrics == null) + { + metrics = new DroppedMessageMetrics(verb); + droppedMessages.put(verb, metrics); + } + metrics.dropped.mark(); + } + private void logDroppedMessages() { boolean logTpstats = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/net/sink/IMessageSink.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/sink/IMessageSink.java b/src/java/org/apache/cassandra/net/sink/IMessageSink.java deleted file mode 100644 index d6b6496..0000000 --- a/src/java/org/apache/cassandra/net/sink/IMessageSink.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.net.sink; - -import java.net.InetAddress; - -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; - -public interface IMessageSink -{ - /** - * Transform or drop an outgoing message - * - * @return null if the message is dropped, or the transformed message to send, which may be just - * the original message - */ - public MessageOut handleMessage(MessageOut message, int id, InetAddress to); - - /** - * Transform or drop an incoming message - * - * @return null if the message is dropped, or the transformed message to receive, which may be just - * the original message - */ - public MessageIn handleMessage(MessageIn message, int id, InetAddress to); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/net/sink/SinkManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/sink/SinkManager.java b/src/java/org/apache/cassandra/net/sink/SinkManager.java deleted file mode 100644 index 7b67afe..0000000 --- a/src/java/org/apache/cassandra/net/sink/SinkManager.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.net.sink; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.List; - -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; - -public class SinkManager -{ - private static final List<IMessageSink> sinks = new ArrayList<IMessageSink>(); - - public static void add(IMessageSink ms) - { - sinks.add(ms); - } - - public static void clear() - { - sinks.clear(); - } - - public static MessageOut processOutboundMessage(MessageOut message, int id, InetAddress to) - { - if (sinks.isEmpty()) - return message; - - for (IMessageSink ms : sinks) - { - message = ms.handleMessage(message, id, to); - if (message == null) - return null; - } - return message; - } - - public static MessageIn processInboundMessage(MessageIn message, int id) - { - if (sinks.isEmpty()) - return message; - - for (IMessageSink ms : sinks) - { - message = ms.handleMessage(message, id, null); - if (message == null) - return null; - } - return message; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 6dd702b..52a2a47 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -62,6 +62,7 @@ import org.apache.cassandra.metrics.ReadRepairMetrics; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.net.*; import org.apache.cassandra.service.paxos.*; +import org.apache.cassandra.sink.SinkManager; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.triggers.TriggerExecutor; import org.apache.cassandra.utils.*; @@ -992,8 +993,12 @@ public class StorageProxy implements StorageProxyMBean { public void runMayThrow() { - rm.apply(); - responseHandler.response(null); + IMutation processed = SinkManager.processWriteRequest(rm); + if (processed != null) + { + processed.apply(); + responseHandler.response(null); + } } }; StageManager.getStage(Stage.MUTATION).execute(runnable); @@ -1104,8 +1109,12 @@ public class StorageProxy implements StorageProxyMBean { public void runMayThrow() { - assert mutation instanceof CounterMutation; - final CounterMutation cm = (CounterMutation) mutation; + IMutation processed = SinkManager.processWriteRequest(mutation); + if (processed == null) + return; + + assert processed instanceof CounterMutation; + final CounterMutation cm = (CounterMutation) processed; // apply mutation cm.apply(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/sink/IMessageSink.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/sink/IMessageSink.java b/src/java/org/apache/cassandra/sink/IMessageSink.java new file mode 100644 index 0000000..996e7ff --- /dev/null +++ b/src/java/org/apache/cassandra/sink/IMessageSink.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sink; + +import java.net.InetAddress; + +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; + +public interface IMessageSink +{ + /** + * Transform or drop an outgoing message + * + * @return null if the message is dropped, or the transformed message to send, which may be just + * the original message + */ + MessageOut handleMessage(MessageOut message, int id, InetAddress to); + + /** + * Transform or drop an incoming message + * + * @return null if the message is dropped, or the transformed message to receive, which may be just + * the original message + */ + MessageIn handleMessage(MessageIn message, int id, InetAddress to); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/sink/IRequestSink.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/sink/IRequestSink.java b/src/java/org/apache/cassandra/sink/IRequestSink.java new file mode 100644 index 0000000..8d68ce8 --- /dev/null +++ b/src/java/org/apache/cassandra/sink/IRequestSink.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sink; + +import org.apache.cassandra.db.IMutation; + +public interface IRequestSink +{ + /** + * Transform or drop a write request (represented by a RowMutation). + * + * @param mutation the RowMutation to be applied locally. + * @return null if the mutation is to be dropped, or the transformed mutation to apply, which may be just + * the original mutation. + */ + IMutation handleWriteRequest(IMutation mutation); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/src/java/org/apache/cassandra/sink/SinkManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/sink/SinkManager.java b/src/java/org/apache/cassandra/sink/SinkManager.java new file mode 100644 index 0000000..9b422dc --- /dev/null +++ b/src/java/org/apache/cassandra/sink/SinkManager.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sink; + +import java.net.InetAddress; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; + +public class SinkManager +{ + private static final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>(); + private static final Set<IRequestSink> requestSinks = new CopyOnWriteArraySet<>(); + + public static void add(IMessageSink ms) + { + messageSinks.add(ms); + } + + public static void add(IRequestSink rs) + { + requestSinks.add(rs); + } + + public static void remove(IMessageSink ms) + { + messageSinks.remove(ms); + } + + public static void remove(IRequestSink rs) + { + requestSinks.remove(rs); + } + + public static void clear() + { + messageSinks.clear(); + requestSinks.clear(); + } + + public static MessageOut processOutboundMessage(MessageOut message, int id, InetAddress to) + { + if (messageSinks.isEmpty()) + return message; + + for (IMessageSink ms : messageSinks) + { + message = ms.handleMessage(message, id, to); + if (message == null) + return null; + } + return message; + } + + public static MessageIn processInboundMessage(MessageIn message, int id) + { + if (messageSinks.isEmpty()) + return message; + + for (IMessageSink ms : messageSinks) + { + message = ms.handleMessage(message, id, null); + if (message == null) + return null; + } + return message; + } + + public static IMutation processWriteRequest(IMutation mutation) + { + if (requestSinks.isEmpty()) + return mutation; + + for (IRequestSink rs : requestSinks) + { + mutation = rs.handleWriteRequest(mutation); + if (mutation == null) + return null; + } + return mutation; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/test/unit/org/apache/cassandra/repair/DifferencerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/DifferencerTest.java b/test/unit/org/apache/cassandra/repair/DifferencerTest.java index 3f259f2..b6dce40 100644 --- a/test/unit/org/apache/cassandra/repair/DifferencerTest.java +++ b/test/unit/org/apache/cassandra/repair/DifferencerTest.java @@ -33,8 +33,8 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.sink.IMessageSink; -import org.apache.cassandra.net.sink.SinkManager; +import org.apache.cassandra.sink.IMessageSink; +import org.apache.cassandra.sink.SinkManager; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.SyncComplete; import org.apache.cassandra.utils.MerkleTree; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/test/unit/org/apache/cassandra/repair/ValidatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index a6be1b1..9fa5d89 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -35,8 +35,8 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.sink.IMessageSink; -import org.apache.cassandra.net.sink.SinkManager; +import org.apache.cassandra.sink.IMessageSink; +import org.apache.cassandra.sink.SinkManager; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.service.StorageService; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6b3fe5ee/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index 1b3eb48..62dd636 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -39,7 +39,7 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.sink.SinkManager; +import org.apache.cassandra.sink.SinkManager; import org.apache.cassandra.utils.FBUtilities; import static org.junit.Assert.assertEquals;
