This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 9b90c90de3 CEP-15 (Accord) Expected reply message with verb 
ACCORD_INFORM_OF_TXNID_RSP but got ACCORD_SIMPLE_RSP
9b90c90de3 is described below

commit 9b90c90de30ad1317e59bc38ed9372cbdd9abfec
Author: David Capwell <[email protected]>
AuthorDate: Fri Mar 31 15:04:51 2023 -0700

    CEP-15 (Accord) Expected reply message with verb ACCORD_INFORM_OF_TXNID_RSP 
but got ACCORD_SIMPLE_RSP
    
    patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-18375
---
 src/java/org/apache/cassandra/net/Messaging.java   | 27 ++++++
 .../org/apache/cassandra/net/MessagingService.java |  4 +-
 src/java/org/apache/cassandra/net/Verb.java        | 99 ++++++++++++++--------
 .../service/accord/AccordMessageSink.java          | 19 ++++-
 .../service/accord/async/AsyncOperation.java       | 17 +++-
 .../service/accord/AccordMessageSinkTest.java      | 56 ++++++++++++
 6 files changed, 180 insertions(+), 42 deletions(-)

diff --git a/src/java/org/apache/cassandra/net/Messaging.java 
b/src/java/org/apache/cassandra/net/Messaging.java
new file mode 100644
index 0000000000..cdc6f7f1ee
--- /dev/null
+++ b/src/java/org/apache/cassandra/net/Messaging.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+
+public interface Messaging
+{
+    void send(Message message, InetAddressAndPort to);
+    void sendWithCallback(Message message, InetAddressAndPort to, 
RequestCallback cb);
+}
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 268a3c3ee8..092e45cfd7 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -203,7 +203,7 @@ import static 
org.apache.cassandra.utils.Throwables.maybeFail;
  * implemented in {@link 
org.apache.cassandra.db.virtual.InternodeInboundTable} and
  * {@link org.apache.cassandra.db.virtual.InternodeOutboundTable} respectively.
  */
-public class MessagingService extends MessagingServiceMBeanImpl
+public class MessagingService extends MessagingServiceMBeanImpl implements 
Messaging
 {
     private static final Logger logger = 
LoggerFactory.getLogger(MessagingService.class);
 
@@ -352,6 +352,7 @@ public class MessagingService extends 
MessagingServiceMBeanImpl
      * @param cb      callback interface which is used to pass the responses or
      *                suggest that a timeout occurred to the invoker of the 
send().
      */
+    @Override
     public void sendWithCallback(Message message, InetAddressAndPort to, 
RequestCallback cb)
     {
         sendWithCallback(message, to, cb, null);
@@ -390,6 +391,7 @@ public class MessagingService extends 
MessagingServiceMBeanImpl
      * @param message messages to be sent.
      * @param to      endpoint to which the message needs to be sent
      */
+    @Override
     public void send(Message message, InetAddressAndPort to)
     {
         send(message, to, null);
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index cbde8e8eee..15584b7e8f 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -19,15 +19,21 @@ package org.apache.cassandra.net;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.function.ToLongFunction;
+import java.util.stream.IntStream;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 
+import org.agrona.collections.IntHashSet;
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
 import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
@@ -222,42 +228,31 @@ public enum Verb
     PAXOS2_CLEANUP_COMPLETE_REQ      (48, P2, repairTimeout, PAXOS_REPAIR,     
 () -> PaxosCleanupComplete.serializer,         () -> 
PaxosCleanupComplete.verbHandler,                      
PAXOS2_CLEANUP_COMPLETE_RSP      ),
 
     // accord
-    ACCORD_SIMPLE_RSP               (119, P2, writeTimeout, REQUEST_RESPONSE,  
   () -> EnumSerializer.simpleReply,           RESPONSE_HANDLER),
-
-    ACCORD_PREACCEPT_RSP            (121, P2, writeTimeout,    
REQUEST_RESPONSE,  () -> PreacceptSerializers.reply,           
RESPONSE_HANDLER),
-    ACCORD_PREACCEPT_REQ            (120, P2, writeTimeout,    ACCORD,         
   () -> PreacceptSerializers.request,         () -> 
AccordService.instance().verbHandler(),       ACCORD_PREACCEPT_RSP),
-
-    ACCORD_ACCEPT_RSP               (124, P2, writeTimeout,    
REQUEST_RESPONSE,  () -> AcceptSerializers.reply,              
RESPONSE_HANDLER),
-    ACCORD_ACCEPT_REQ               (122, P2, writeTimeout,    ACCORD,         
   () -> AcceptSerializers.request,            () -> 
AccordService.instance().verbHandler(),       ACCORD_ACCEPT_RSP   ),
-    ACCORD_ACCEPT_INVALIDATE_REQ    (123, P2, writeTimeout,    ACCORD,         
   () -> AcceptSerializers.invalidate,         () -> 
AccordService.instance().verbHandler(),       ACCORD_ACCEPT_RSP   ),
-
-    ACCORD_READ_RSP                 (128, P2, writeTimeout,    
REQUEST_RESPONSE,  () -> ReadDataSerializers.reply,            
RESPONSE_HANDLER),
-    ACCORD_READ_REQ                 (127, P2, writeTimeout,    ACCORD,         
   () -> ReadDataSerializers.request,          () -> 
AccordService.instance().verbHandler(),       ACCORD_READ_RSP     ),
-    ACCORD_COMMIT_REQ               (125, P2, writeTimeout,    ACCORD,         
   () -> CommitSerializers.request,            () -> 
AccordService.instance().verbHandler(), ACCORD_READ_RSP     ),
-    ACCORD_COMMIT_INVALIDATE_REQ    (126, P2, writeTimeout,    ACCORD,         
   () -> CommitSerializers.invalidate,         () -> 
AccordService.instance().verbHandler()),
-
-    ACCORD_APPLY_RSP                (130, P2, writeTimeout,    
REQUEST_RESPONSE,  () -> ApplySerializers.reply,               
RESPONSE_HANDLER),
-    ACCORD_APPLY_REQ                (129, P2, writeTimeout,    ACCORD,         
   () -> ApplySerializers.request,             () -> 
AccordService.instance().verbHandler(), ACCORD_APPLY_RSP),
-
-    ACCORD_RECOVER_RSP              (134, P2, writeTimeout,    
REQUEST_RESPONSE,  () -> RecoverySerializers.reply,            
RESPONSE_HANDLER),
-    ACCORD_RECOVER_REQ              (133, P2, writeTimeout,    ACCORD,         
   () -> RecoverySerializers.request,          () -> 
AccordService.instance().verbHandler(),       ACCORD_RECOVER_RSP  ),
-    ACCORD_BEGIN_INVALIDATE_RSP     (136, P2, writeTimeout, REQUEST_RESPONSE, 
() -> BeginInvalidationSerializers.reply, RESPONSE_HANDLER),
-    ACCORD_BEGIN_INVALIDATE_REQ     (135, P2, writeTimeout, ACCORD, () -> 
BeginInvalidationSerializers.request, () -> 
AccordService.instance().verbHandler(), ACCORD_BEGIN_INVALIDATE_RSP),
-    ACCORD_WAIT_COMMIT_RSP          (138, P2, writeTimeout,    
REQUEST_RESPONSE,  () -> WaitOnCommitSerializer.reply,         
RESPONSE_HANDLER),
-    ACCORD_WAIT_COMMIT_REQ          (137, P2, writeTimeout,    ACCORD,         
   () -> WaitOnCommitSerializer.request,       () -> 
AccordService.instance().verbHandler(),     ACCORD_WAIT_COMMIT_RSP),
-
-    ACCORD_INFORM_OF_TXNID_RSP(140, P2, writeTimeout, REQUEST_RESPONSE, () -> 
EnumSerializer.simpleReply, RESPONSE_HANDLER),
-    ACCORD_INFORM_OF_TXNID_REQ(139, P2, writeTimeout, ACCORD, () -> 
InformOfTxnIdSerializers.request, () -> AccordService.instance().verbHandler(), 
ACCORD_INFORM_OF_TXNID_RSP),
-
-    ACCORD_INFORM_HOME_DURABLE_REQ(141, P2, writeTimeout, ACCORD, () -> 
InformHomeDurableSerializers.request, () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP),
-
-    ACCORD_INFORM_DURABLE_REQ(143, P2, writeTimeout, ACCORD, () -> 
InformDurableSerializers.request, () -> AccordService.instance().verbHandler(), 
ACCORD_SIMPLE_RSP),
-
-    ACCORD_CHECK_STATUS_RSP         (146, P2, writeTimeout, REQUEST_RESPONSE,  
   () -> CheckStatusSerializers.reply,         RESPONSE_HANDLER),
-    ACCORD_CHECK_STATUS_REQ         (145, P2, writeTimeout, ACCORD,            
   () -> CheckStatusSerializers.request,       () -> 
AccordService.instance().verbHandler(), ACCORD_CHECK_STATUS_RSP),
-
-    ACCORD_GET_DEPS_RSP         (148, P2, writeTimeout, REQUEST_RESPONSE, () 
-> GetDepsSerializers.reply, RESPONSE_HANDLER),
-    ACCORD_GET_DEPS_REQ         (147, P2, writeTimeout, ACCORD,               
() -> GetDepsSerializers.request,       () -> 
AccordService.instance().verbHandler(), ACCORD_GET_DEPS_RSP),
+    ACCORD_SIMPLE_RSP               (119, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> EnumSerializer.simpleReply,           RESPONSE_HANDLER                   
                                         ),
+    ACCORD_PREACCEPT_RSP            (121, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> PreacceptSerializers.reply,           RESPONSE_HANDLER                   
                                         ),
+    ACCORD_PREACCEPT_REQ            (120, P2, writeTimeout, ACCORD,            
 () -> PreacceptSerializers.request,         () -> 
AccordService.instance().verbHandler(), ACCORD_PREACCEPT_RSP          ),
+    ACCORD_ACCEPT_RSP               (124, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> AcceptSerializers.reply,              RESPONSE_HANDLER                   
                                         ),
+    ACCORD_ACCEPT_REQ               (122, P2, writeTimeout, ACCORD,            
 () -> AcceptSerializers.request,            () -> 
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP             ),
+    ACCORD_ACCEPT_INVALIDATE_REQ    (123, P2, writeTimeout, ACCORD,            
 () -> AcceptSerializers.invalidate,         () -> 
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP             ),
+    ACCORD_READ_RSP                 (128, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> ReadDataSerializers.reply,            RESPONSE_HANDLER                   
                                         ),
+    ACCORD_READ_REQ                 (127, P2, writeTimeout, ACCORD,            
 () -> ReadDataSerializers.request,          () -> 
AccordService.instance().verbHandler(), ACCORD_READ_RSP               ),
+    ACCORD_COMMIT_REQ               (125, P2, writeTimeout, ACCORD,            
 () -> CommitSerializers.request,            () -> 
AccordService.instance().verbHandler(), ACCORD_READ_RSP               ),
+    ACCORD_COMMIT_INVALIDATE_REQ    (126, P2, writeTimeout, ACCORD,            
 () -> CommitSerializers.invalidate,         () -> 
AccordService.instance().verbHandler()                                ),
+    ACCORD_APPLY_RSP                (130, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> ApplySerializers.reply,               RESPONSE_HANDLER                   
                                         ),
+    ACCORD_APPLY_REQ                (129, P2, writeTimeout, ACCORD,            
 () -> ApplySerializers.request,             () -> 
AccordService.instance().verbHandler(), ACCORD_APPLY_RSP              ),
+    ACCORD_RECOVER_RSP              (132, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> RecoverySerializers.reply,            RESPONSE_HANDLER                   
                                         ),
+    ACCORD_RECOVER_REQ              (131, P2, writeTimeout, ACCORD,            
 () -> RecoverySerializers.request,          () -> 
AccordService.instance().verbHandler(), ACCORD_RECOVER_RSP            ),
+    ACCORD_BEGIN_INVALIDATE_RSP     (134, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> BeginInvalidationSerializers.reply,   RESPONSE_HANDLER                   
                                         ),
+    ACCORD_BEGIN_INVALIDATE_REQ     (133, P2, writeTimeout, ACCORD,            
 () -> BeginInvalidationSerializers.request, () -> 
AccordService.instance().verbHandler(), ACCORD_BEGIN_INVALIDATE_RSP   ),
+    ACCORD_WAIT_COMMIT_RSP          (136, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> WaitOnCommitSerializer.reply,         RESPONSE_HANDLER                   
                                         ),
+    ACCORD_WAIT_COMMIT_REQ          (135, P2, writeTimeout, ACCORD,            
 () -> WaitOnCommitSerializer.request,       () -> 
AccordService.instance().verbHandler(), ACCORD_WAIT_COMMIT_RSP        ),
+    ACCORD_INFORM_OF_TXNID_REQ      (137, P2, writeTimeout, ACCORD,            
 () -> InformOfTxnIdSerializers.request,     () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
+    ACCORD_INFORM_HOME_DURABLE_REQ  (138, P2, writeTimeout, ACCORD,            
 () -> InformHomeDurableSerializers.request, () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
+    ACCORD_INFORM_DURABLE_REQ       (139, P2, writeTimeout, ACCORD,            
 () -> InformDurableSerializers.request,     () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
+    ACCORD_CHECK_STATUS_RSP         (141, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> CheckStatusSerializers.reply,         RESPONSE_HANDLER                   
                                         ),
+    ACCORD_CHECK_STATUS_REQ         (140, P2, writeTimeout, ACCORD,            
 () -> CheckStatusSerializers.request,       () -> 
AccordService.instance().verbHandler(), ACCORD_CHECK_STATUS_RSP       ),
+    ACCORD_GET_DEPS_RSP             (143, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> GetDepsSerializers.reply,             RESPONSE_HANDLER                   
                                         ),
+    ACCORD_GET_DEPS_REQ             (142, P2, writeTimeout, ACCORD,            
 () -> GetDepsSerializers.request,           () -> 
AccordService.instance().verbHandler(), ACCORD_GET_DEPS_RSP           ),
 
 
     // generic failure response
@@ -467,6 +462,7 @@ public enum Verb
     static
     {
         Verb[] verbs = values();
+        checkForGaps(verbs);
         int max = -1;
         int minCustom = Integer.MAX_VALUE;
         for (Verb v : verbs)
@@ -515,6 +511,37 @@ public enum Verb
         idToCustomVerbMap = customIdMap;
     }
 
+    private static void checkForGaps(Verb[] array)
+    {
+        // If a verb is removed please add the id to this list, so this logic 
to detect gaps won't complain
+        IntHashSet allowedMissing = new IntHashSet();
+        for (int i : new int[]{ 7, 8, 12, 13, 17, 21, 25, 26, 32, 36, 64, 67, 
68, 70, 71, 72, 73, 74, 75, 76, 77, 78, 81, 83, 85, 86, 89, 90, 92, 96,
+                                /* gap for accord, should fix when merging to 
trunk */ 116, 117, 118})
+            allowedMissing.add(i);
+        List<Verb> verbs = new ArrayList<>(Arrays.asList(array));
+        Collections.sort(verbs, Comparator.comparingInt(a -> a.id));
+        Verb previous = null;
+        List<String> errors = new ArrayList<>();
+        int minCustomVerb = Verb.UNUSED_CUSTOM_VERB.id;
+        for (Verb v : verbs)
+        {
+            if (v.id >= minCustomVerb)
+                continue; // ignore custom ids
+            if (allowedMissing.contains(v.id))
+                throw new AssertionError("Verb " + v + " used id " + v.id + " 
which is in the list of allowed missing; please remove from that list");
+            if (previous != null)
+            {
+                Verb finalPrevious = previous;
+                int[] missing = IntStream.range(previous.id + 1, 
v.id).filter(i -> !allowedMissing.contains(i)).toArray();
+                if (missing.length > 0)
+                    errors.add("Gap detected between verbs " + 
Arrays.asList(finalPrevious, v) + "; " + Arrays.asList(finalPrevious.id, v.id) 
+ "; missing ids are " + Arrays.toString(missing));
+            }
+            previous = v;
+        }
+        if (!errors.isEmpty())
+            throw new AssertionError(String.join("\n", errors));
+    }
+
     public static Verb fromId(int id)
     {
         Verb[] verbs = idToVerbMap;
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java 
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index dc329e8807..c7f1591e92 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -36,6 +36,7 @@ import accord.messages.ReplyContext;
 import accord.messages.Request;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.Messaging;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
 
@@ -92,6 +93,18 @@ public class AccordMessageSink implements MessageSink
         return VerbMapping.instance.mapping.get(type);
     }
 
+    private final Messaging messaging;
+
+    public AccordMessageSink(Messaging messaging)
+    {
+        this.messaging = messaging;
+    }
+
+    public AccordMessageSink()
+    {
+        this(MessagingService.instance());
+    }
+
     @Override
     public void send(Node.Id to, Request request)
     {
@@ -100,7 +113,7 @@ public class AccordMessageSink implements MessageSink
         Message<Request> message = Message.out(verb, request);
         InetAddressAndPort endpoint = getEndpoint(to);
         logger.debug("Sending {} {} to {}", verb, message.payload, endpoint);
-        MessagingService.instance().send(message, endpoint);
+        messaging.send(message, endpoint);
     }
 
     @Override
@@ -111,7 +124,7 @@ public class AccordMessageSink implements MessageSink
         Message<Request> message = Message.out(verb, request);
         InetAddressAndPort endpoint = getEndpoint(to);
         logger.debug("Sending {} {} to {}", verb, message.payload, endpoint);
-        MessagingService.instance().sendWithCallback(message, endpoint, new 
AccordCallback<>((Callback<Reply>) callback));
+        messaging.sendWithCallback(message, endpoint, new 
AccordCallback<>((Callback<Reply>) callback));
     }
 
     @Override
@@ -122,6 +135,6 @@ public class AccordMessageSink implements MessageSink
         Preconditions.checkArgument(replyMsg.verb() == getVerb(reply.type()));
         InetAddressAndPort endpoint = getEndpoint(replyingToNode);
         logger.debug("Replying {} {} to {}", replyMsg.verb(), 
replyMsg.payload, endpoint);
-        MessagingService.instance().send(replyMsg, endpoint);
+        messaging.send(replyMsg, endpoint);
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index db659d623f..2f2dd2ce6d 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -80,7 +80,19 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
         AWAITING_SAVE,  // wait for writes to complete
         COMPLETING,
         FINISHED,
-        FAILED
+        FAILED;
+
+        boolean isComplete()
+        {
+            switch (this)
+            {
+                case FAILED:
+                case FINISHED:
+                    return true;
+                default:
+                    return false;
+            }
+        }
     }
 
     private State state = State.INITIALIZED;
@@ -179,7 +191,8 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
     private void fail(Throwable throwable)
     {
         Invariants.nonNull(throwable);
-        Invariants.checkArgument(state != State.FINISHED && state != 
State.FAILED, "Unexpected state %s", state);
+        if (state.isComplete())
+            throw new IllegalStateException("Unexpected state " + state, 
throwable);
         try
         {
             switch (state)
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
new file mode 100644
index 0000000000..82394eaed0
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import accord.local.Node;
+import accord.messages.InformOfTxnId;
+import accord.messages.SimpleReply;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.Messaging;
+import org.apache.cassandra.net.Verb;
+import org.mockito.Mockito;
+
+public class AccordMessageSinkTest
+{
+    @BeforeClass
+    public static void setup()
+    {
+        DatabaseDescriptor.clientInitialization();
+    }
+
+    @Test
+    public void informOfTxn()
+    {
+        // There was an issue where the reply was the wrong verb
+        // see CASSANDRA-18375
+        InformOfTxnId info = Mockito.mock(InformOfTxnId.class);
+        Message<InformOfTxnId> req = 
Message.builder(Verb.ACCORD_INFORM_OF_TXNID_REQ, info).build();
+        SimpleReply reply = SimpleReply.Ok;
+
+        Messaging messaging = Mockito.mock(Messaging.class);
+        AccordMessageSink sink = new AccordMessageSink(messaging);
+        sink.reply(new Node.Id(1), req, reply);
+
+        Mockito.verify(messaging).send(Mockito.any(), Mockito.any());
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to