This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit a27092926b42a83ec0a1e6188677329737c5a3f4
Merge: cd82046 b2f2c70
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Fri Jan 31 14:37:25 2020 +0100

    Merge branch 'cassandra-2.2' into cassandra-3.0

 .../apache/cassandra/distributed/api/IMessage.java |   8 +-
 .../cassandra/distributed/api/IMessageFilters.java |  28 ++-
 .../distributed/impl/AbstractCluster.java          |  17 +-
 .../distributed/impl/IInvokableInstance.java       |   1 -
 .../cassandra/distributed/impl/Instance.java       | 187 ++++++++++--------
 .../cassandra/distributed/impl/MessageFilters.java |  79 ++++----
 .../distributed/test/MessageFiltersTest.java       | 210 +++++++++++++++++++++
 7 files changed, 395 insertions(+), 135 deletions(-)

diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 3de5ed8,0647198..5a4dcf4
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -192,7 -192,9 +193,10 @@@ public class Instance extends IsolatedE
      {
          BiConsumer<InetAddressAndPort, IMessage> deliverToInstance = (to, 
message) -> cluster.get(to).receiveMessage(message);
          BiConsumer<InetAddressAndPort, IMessage> 
deliverToInstanceIfNotFiltered = (to, message) -> {
-             if (cluster.filters().permit(this, cluster.get(to), 
message.verb()))
+             int fromNum = config().num();
+             int toNum = cluster.get(to).config().num();
++
+             if (cluster.filters().permit(fromNum, toNum, message))
                  deliverToInstance.accept(to, message);
          };
  
@@@ -242,46 -265,35 +267,34 @@@
  
          public boolean allowOutgoingMessage(MessageOut messageOut, int id, 
InetAddress to)
          {
-             try (DataOutputBuffer out = new DataOutputBuffer(1024))
+             InetAddressAndPort from = broadcastAddressAndPort();
 -            InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
+             assert from.equals(lookupAddressAndPort.apply(messageOut.from));
 -
 -            IMessage serialized = serializeMessage(messageOut, id, 
broadcastAddressAndPort(), lookupAddressAndPort.apply(messageOut.from));
++            InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
++            IMessage message = serializeMessage(messageOut, id, from, toFull);
+ 
+             // Tracing logic - similar to 
org.apache.cassandra.net.OutboundTcpConnection.writeConnected
+             byte[] sessionBytes = (byte[]) 
messageOut.parameters.get(Tracing.TRACE_HEADER);
+             if (sessionBytes != null)
              {
-                 InetAddressAndPort from = broadcastAddressAndPort();
-                 assert 
from.equals(lookupAddressAndPort.apply(messageOut.from));
-                 InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
-                 int version = MessagingService.instance().getVersion(to);
- 
-                 // Tracing logic - similar to 
org.apache.cassandra.net.OutboundTcpConnection.writeConnected
-                 byte[] sessionBytes = (byte[]) 
messageOut.parameters.get(Tracing.TRACE_HEADER);
-                 if (sessionBytes != null)
+                 UUID sessionId = 
UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
+                 TraceState state = Tracing.instance.get(sessionId);
 -                String message = String.format("Sending %s message to %s", 
messageOut.verb, to);
++                String traceMessage = String.format("Sending %s message to 
%s", messageOut.verb, toFull.address);
+                 // session may have already finished; see CASSANDRA-5668
+                 if (state == null)
                  {
-                     UUID sessionId = 
UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
-                     TraceState state = Tracing.instance.get(sessionId);
-                     String message = String.format("Sending %s message to 
%s", messageOut.verb, toFull.address);
-                     // session may have already finished; see CASSANDRA-5668
-                     if (state == null)
-                     {
-                         byte[] traceTypeBytes = (byte[]) 
messageOut.parameters.get(Tracing.TRACE_TYPE);
-                         Tracing.TraceType traceType = traceTypeBytes == null 
? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
-                         
TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, 
traceType.getTTL());
-                     }
-                     else
-                     {
-                         state.trace(message);
-                         if (messageOut.verb == 
MessagingService.Verb.REQUEST_RESPONSE)
-                             Tracing.instance.doneWithNonLocalSession(state);
-                     }
+                     byte[] traceTypeBytes = (byte[]) 
messageOut.parameters.get(Tracing.TRACE_TYPE);
+                     Tracing.TraceType traceType = traceTypeBytes == null ? 
Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
 -                    
TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), message, -1, 
traceType.getTTL());
++                    
TraceState.mutateWithTracing(ByteBuffer.wrap(sessionBytes), traceMessage, -1, 
traceType.getTTL());
+                 }
+                 else
+                 {
 -                    state.trace(message);
++                    state.trace(traceMessage);
+                     if (messageOut.verb == 
MessagingService.Verb.REQUEST_RESPONSE)
+                         Tracing.instance.doneWithNonLocalSession(state);
                  }
- 
-                 out.writeInt(MessagingService.PROTOCOL_MAGIC);
-                 out.writeInt(id);
-                 long timestamp = System.currentTimeMillis();
-                 out.writeInt((int) timestamp);
-                 messageOut.serialize(out, version);
-                 deliver.accept(toFull, new Message(messageOut.verb.ordinal(), 
out.toByteArray(), id, version, from));
-             }
-             catch (IOException e)
-             {
-                 throw new RuntimeException(e);
              }
+ 
 -            deliver.accept(toFull, serialized);
++            deliver.accept(toFull, message);
              return false;
          }
  
@@@ -292,50 -304,46 +305,47 @@@
          }
      }
  
-     public void receiveMessage(IMessage imessage)
 -    public static Pair<MessageIn<Object>, Integer> 
deserializeMessage(IMessage msg)
++
++    public static Pair<MessageIn<Object>, Integer> 
deserializeMessage(IMessage imessage)
      {
-         sync(() -> {
-             // Based on 
org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
-             try (DataInputBuffer input = new 
DataInputBuffer(imessage.bytes()))
+         // Based on 
org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
 -        try (DataInputStream input = new DataInputStream(new 
ByteArrayInputStream(msg.bytes())))
++        try (DataInputBuffer input = new DataInputBuffer(imessage.bytes()))
+         {
 -            int version = msg.version();
++            int version = imessage.version();
+             if (version > MessagingService.current_version)
              {
-                 int version = imessage.version();
-                 if (version > MessagingService.current_version)
-                 {
-                     throw new IllegalStateException(String.format("Node%d 
received message version %d but current version is %d",
-                                                                   
this.config.num(),
-                                                                   version,
-                                                                   
MessagingService.current_version));
-                 }
+                 throw new IllegalStateException(String.format("Received 
message version %d but current version is %d",
+                                                               version,
+                                                               
MessagingService.current_version));
+             }
  
-                 MessagingService.validateMagic(input.readInt());
-                 int id;
-                 if (version < MessagingService.VERSION_20)
-                     id = Integer.parseInt(input.readUTF());
-                 else
-                     id = input.readInt();
+             MessagingService.validateMagic(input.readInt());
+             int id;
+             if (version < MessagingService.VERSION_20)
+                 id = Integer.parseInt(input.readUTF());
+             else
+                 id = input.readInt();
 -            if (msg.id() != id)
 -                throw new IllegalStateException(String.format("Message id 
mismatch: %d != %d", msg.id(), id));
++            if (imessage.id() != id)
++                throw new IllegalStateException(String.format("Message id 
mismatch: %d != %d", imessage.id(), id));
  
-                 long timestamp = System.currentTimeMillis();
-                 boolean isCrossNodeTimestamp = false;
-                 // make sure to readInt, even if cross_node_to is not enabled
-                 int partial = input.readInt();
-                 if (DatabaseDescriptor.hasCrossNodeTimeout())
-                 {
-                     long crossNodeTimestamp = (timestamp & 
0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
-                     isCrossNodeTimestamp = (timestamp != crossNodeTimestamp);
-                     timestamp = crossNodeTimestamp;
-                 }
+             // make sure to readInt, even if cross_node_to is not enabled
+             int partial = input.readInt();
  
-                 MessageIn message = MessageIn.read(input, version, id);
-                 if (message == null)
-                 {
-                     // callback expired; nothing to do
-                     return;
-                 }
-                 if (version <= MessagingService.current_version)
-                 {
-                     MessagingService.instance().receive(message, id, 
timestamp, isCrossNodeTimestamp);
-                 }
-                 // else ignore message
+             return Pair.create(MessageIn.read(input, version, id), partial);
+         }
+         catch (IOException e)
+         {
+             throw new RuntimeException();
+         }
+     }
+ 
+     public void receiveMessage(IMessage imessage)
+     {
+         sync(() -> {
+             Pair<MessageIn<Object>, Integer> deserialized = null;
+             try
+             {
+                 deserialized = deserializeMessage(imessage);
              }
              catch (Throwable t)
              {
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
index c1607f8,c92553f..833677b
--- a/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/MessageFilters.java
@@@ -19,16 -19,11 +19,14 @@@
  package org.apache.cassandra.distributed.impl;
  
  import java.util.Arrays;
- import java.util.Set;
- import java.util.concurrent.CopyOnWriteArraySet;
- import java.util.function.BiConsumer;
+ import java.util.List;
+ import java.util.concurrent.CopyOnWriteArrayList;
++import java.util.function.Supplier;
++
++import com.google.common.annotations.VisibleForTesting;
  
- import org.apache.cassandra.distributed.api.IInstance;
  import org.apache.cassandra.distributed.api.IMessage;
  import org.apache.cassandra.distributed.api.IMessageFilters;
- import org.apache.cassandra.distributed.api.ICluster;
- import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.net.MessagingService;
  
  public class MessageFilters implements IMessageFilters
  {
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
index 0000000,96974d8..07e7428
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/MessageFiltersTest.java
@@@ -1,0 -1,210 +1,210 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.util.Arrays;
 -import java.util.HashSet;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
++import com.google.common.collect.Sets;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.db.ConsistencyLevel;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+ import org.apache.cassandra.distributed.api.IMessage;
+ import org.apache.cassandra.distributed.api.IMessageFilters;
+ import org.apache.cassandra.distributed.impl.Instance;
+ import org.apache.cassandra.distributed.impl.MessageFilters;
+ import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessagingService;
+ 
+ public class MessageFiltersTest extends DistributedTestBase
+ {
+     @Test
+     public void simpleFiltersTest() throws Throwable
+     {
+         int VERB1 = MessagingService.Verb.READ.ordinal();
+         int VERB2 = MessagingService.Verb.REQUEST_RESPONSE.ordinal();
+         int VERB3 = MessagingService.Verb.READ_REPAIR.ordinal();
+         int i1 = 1;
+         int i2 = 2;
+         int i3 = 3;
+         String MSG1 = "msg1";
+         String MSG2 = "msg2";
+ 
+         MessageFilters filters = new MessageFilters();
+         MessageFilters.Filter filter = filters.allVerbs().from(1).drop();
+ 
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB2, MSG1)));
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB3, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         filter.off();
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         filters.reset();
+ 
+         filters.verbs(VERB1).from(1).to(2).drop();
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB2, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i3, msg(VERB2, MSG1)));
+ 
+         filters.reset();
+         AtomicInteger counter = new AtomicInteger();
+         filters.verbs(VERB1).from(1).to(2).messagesMatching((from, to, msg) 
-> {
+             counter.incrementAndGet();
+             return Arrays.equals(msg.bytes(), MSG1.getBytes());
+         }).drop();
+         Assert.assertFalse(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertEquals(counter.get(), 1);
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG2)));
+         Assert.assertEquals(counter.get(), 2);
+ 
+         // filter chain gets interrupted because a higher level filter 
returns no match
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         Assert.assertEquals(counter.get(), 2);
+         Assert.assertTrue(filters.permit(i2, i1, msg(VERB2, MSG1)));
+         Assert.assertEquals(counter.get(), 2);
+         filters.reset();
+ 
+         filters.allVerbs().from(3, 2).to(2, 1).drop();
+         Assert.assertFalse(filters.permit(i3, i1, msg(VERB1, MSG1)));
+         Assert.assertFalse(filters.permit(i3, i2, msg(VERB1, MSG1)));
+         Assert.assertFalse(filters.permit(i2, i1, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i2, i3, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+         filters.reset();
+ 
+         counter.set(0);
+         filters.allVerbs().from(1).to(2).messagesMatching((from, to, msg) -> {
+             counter.incrementAndGet();
+             return false;
+         }).drop();
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i3, msg(VERB1, MSG1)));
+         Assert.assertTrue(filters.permit(i1, i2, msg(VERB1, MSG1)));
+         Assert.assertEquals(2, counter.get());
+     }
+ 
+     IMessage msg(int verb, String msg)
+     {
+         return new IMessage()
+         {
+             public int verb() { return verb; }
+             public byte[] bytes() { return msg.getBytes(); }
+             public int id() { return 0; }
+             public int version() { return 0;  }
+             public InetAddressAndPort from() { return null; }
+         };
+     }
+ 
+     @Test
+     public void testFilters() throws Throwable
+     {
+         String read = "SELECT * FROM " + KEYSPACE + ".tbl";
+         String write = "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES 
(1, 1, 1)";
+ 
+         try (Cluster cluster = Cluster.create(2))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': " + 
cluster.size() + "};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             // Reads and writes are going to time out in both directions
+             cluster.filters().allVerbs().from(1).to(2).drop();
+             for (int i : new int[]{ 1, 2 })
+                 assertTimeOut(() -> cluster.coordinator(i).execute(read, 
ConsistencyLevel.ALL));
+             for (int i : new int[]{ 1, 2 })
+                 assertTimeOut(() -> cluster.coordinator(i).execute(write, 
ConsistencyLevel.ALL));
+ 
+             cluster.filters().reset();
+             // Reads are going to timeout only when 1 serves as a coordinator
+             
cluster.verbs(MessagingService.Verb.RANGE_SLICE).from(1).to(2).drop();
+             assertTimeOut(() -> cluster.coordinator(1).execute(read, 
ConsistencyLevel.ALL));
+             cluster.coordinator(2).execute(read, ConsistencyLevel.ALL);
+ 
+             // Writes work in both directions
+             for (int i : new int[]{ 1, 2 })
+                 cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
+         }
+     }
+ 
+     @Test
+     public void testMessageMatching() throws Throwable
+     {
+         String read = "SELECT * FROM " + KEYSPACE + ".tbl";
+         String write = "INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES 
(1, 1, 1)";
+ 
+         try (Cluster cluster = Cluster.create(2))
+         {
+             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': " + 
cluster.size() + "};");
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+ 
+             AtomicInteger counter = new AtomicInteger();
+ 
 -            Set<Integer> verbs = new 
HashSet<>(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(),
 -                                                             
MessagingService.Verb.MUTATION.ordinal()));
++            Set<Integer> verbs = 
Sets.newHashSet(Arrays.asList(MessagingService.Verb.RANGE_SLICE.ordinal(),
++                                                               
MessagingService.Verb.MUTATION.ordinal()));
+ 
+             // Reads and writes are going to time out in both directions
+             IMessageFilters.Filter filter = cluster.filters()
+                                                    .allVerbs()
+                                                    .from(1)
+                                                    .to(2)
+                                                    .messagesMatching((from, 
to, msg) -> {
+                                                        // Decode and verify 
message on instance; return the result back here
+                                                        Integer id = 
cluster.get(1).callsOnInstance((IIsolatedExecutor.SerializableCallable<Integer>)
 () -> {
+                                                            MessageIn decoded 
= Instance.deserializeMessage(msg).left;
+                                                            if (decoded != 
null)
+                                                                return 
(Integer) decoded.verb.ordinal();
+                                                            return -1;
+                                                        }).call();
+                                                        if (id > 0)
+                                                            
Assert.assertTrue(verbs.contains(id));
+                                                        
counter.incrementAndGet();
+                                                        return false;
+                                                    }).drop();
+ 
+             for (int i : new int[]{ 1, 2 })
+                 cluster.coordinator(i).execute(read, ConsistencyLevel.ALL);
+             for (int i : new int[]{ 1, 2 })
+                 cluster.coordinator(i).execute(write, ConsistencyLevel.ALL);
+ 
+             filter.off();
+             Assert.assertEquals(4, counter.get());
+         }
+     }
+ 
+     private static void assertTimeOut(Runnable r)
+     {
+         try
+         {
+             r.run();
+             Assert.fail("Should have timed out");
+         }
+         catch (Throwable t)
+         {
+             if (!t.toString().contains("TimeoutException"))
+                 throw t;
+             // ignore
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to