[
https://issues.apache.org/jira/browse/CASSANDRA-15505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17022591#comment-17022591
]
David Capwell commented on CASSANDRA-15505:
-------------------------------------------
Ok, [~ifesdjeen] and I talked about this... so putting it here for history.
There exists a desire to accumulate messages and process them later in the test
{code}
ThreadSafeList<DecoratedKey> accum
cluster.filters().from(1, 2).to(3).filter((_, _, msg) -> {
accum.add(msg.payload.decoratedKey)
return true;
})
{code}
Here msg is net.Message, but this doesn't work because of mixing class loaders.
{code}
ThreadSafeList<DecoratedKey> accum
cluster.filters().from(1, 2).to(3).filter((_, _, msg) -> {
accum.add(deserialize(msg).payload.decoratedKey)
return true;
})
{code}
In this example msg is dtest Message and the filter is still transferred. This
also fails because mixing class loaders (DecoratedKey exists in test class
loader, and again for each instance)
{code}
cluster.filters().from(1, 2).to(3).filter((_, _, msg) -> {
Assert.assertTrue(<the Key I wanted to see>,
deserialize(msg).payload.decoratedKey)
return true;
}).pass()
{code}
There was also a desire to assert a specific state when a message happens.
This doesn't work since the call site is MessagingService, so the assert will
cause a exception thrown in the instance and not the test method.
The above needs to be rewrite to the following to work
{code}
AtomicReference<Boolean> epicFail
cluster.filters.allVerbs().from(1).to(2).filter(msg -> {
epicFail.set(deserialize(msg).header.contains("totally doesn't compile but you
get the point"))
})
...
epicFail == null // didn't see
epicFail == true // assert
{code}
After all this, we finalized that the below is desirable
{code}
CountDownLatch latch = new CountDownLatch(1);
cluster.filters.allVerbs().from(1).to(2).filter(msg -> {
// THIS IS EXECUTED ON INSTANCE
latch.countDown();
return true;
})
{code}
This works only if the filter is not "transferred" to the instance and stays in
the test class loader (transfer clones where as lambda "captures"; aka uses
references rather than copies)
If you need to transfer, you can selectively do it
{code}
IMessageFilters.Filter filter = cluster.filters()
.allVerbs()
.from(1)
.to(2)
.messagesMatching((from, to, msg) -> {
cluster.get(from).runOnInstance(() -> {
try {
DataInputPlus.DataInputStreamPlus stream = new
DataInputPlus.DataInputStreamPlus(new ByteArrayInputStream(msg.bytes()));
Message<?> instance = (Message<?>)
Verb.values()[msg.id()].serializer().deserialize(stream, to);
System.out.println(instance.header.createdAtNanos);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
return true;
}).pass();
{code}
Given all of the above, we settled on the following
* drop the 2 function method (on instance, off instance)
* matcher should have the signature (int, int, IMessage) -> Boolean (this is
optional)
* don't transfer the matcher
> Add message interceptors to in-jvm dtests
> -----------------------------------------
>
> Key: CASSANDRA-15505
> URL: https://issues.apache.org/jira/browse/CASSANDRA-15505
> Project: Cassandra
> Issue Type: New Feature
> Components: Test/dtest
> Reporter: Alex Petrov
> Assignee: Alex Petrov
> Priority: Normal
> Labels: pull-request-available
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> Currently we only have means to filter messages in in-jvm tests. We need a
> facility to intercept and modify the messages between nodes for testing
> purposes.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]