This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dd6d3720f0a [WebSocket] Fix MultiTopicReader#getConsumer
ClassCastException (#15534)
dd6d3720f0a is described below
commit dd6d3720f0a1d10df9885592ff0e0f9481325f2a
Author: Michael Marshall <[email protected]>
AuthorDate: Tue May 10 22:46:44 2022 -0500
[WebSocket] Fix MultiTopicReader#getConsumer ClassCastException (#15534)
### Motivation
This fixes an issue similar to the one solved in
https://github.com/apache/pulsar/pull/14316. When the `reader` is a
`MultiTopicReader`, the `getConsumer()` method currently throws a
`ClassCastException`.
### Modifications
* Update `MultiTopicReader#getConsumer` so that it safely casts the
`reader`.
* Update the `ReaderHandler` constructor to use the `getConsumer` method.
### Verifying this change
I expanded existing tests to cover the scenario that would have previously
failed.
### Does this pull request potentially affect one of the following parts:
No, this is not a breaking change.
---
.../java/org/apache/pulsar/websocket/ReaderHandler.java | 16 ++++++++++------
.../org/apache/pulsar/websocket/ReaderHandlerTest.java | 4 ++++
2 files changed, 14 insertions(+), 6 deletions(-)
diff --git
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 54688ddb6b9..6510386aef9 100644
---
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -104,13 +104,11 @@ public class ReaderHandler extends
AbstractWebSocketHandler {
}
this.reader = builder.create();
- if (reader instanceof MultiTopicsReaderImpl) {
- this.subscription = ((MultiTopicsReaderImpl<?>)
reader).getMultiTopicsConsumer().getSubscription();
- } else if (reader instanceof ReaderImpl) {
- this.subscription = ((ReaderImpl<?>)
reader).getConsumer().getSubscription();
- } else {
+ Consumer<?> consumer = getConsumer();
+ if (consumer == null) {
throw new IllegalArgumentException(String.format("Illegal
Reader Type %s", reader.getClass()));
}
+ this.subscription = consumer.getSubscription();
if (!this.service.addReader(this)) {
log.warn("[{}:{}] Failed to add reader handler for topic {}",
request.getRemoteAddr(),
request.getRemotePort(), topic);
@@ -272,7 +270,13 @@ public class ReaderHandler extends
AbstractWebSocketHandler {
}
public Consumer<?> getConsumer() {
- return reader != null ? ((ReaderImpl<?>) reader).getConsumer() : null;
+ if (reader instanceof MultiTopicsReaderImpl) {
+ return ((MultiTopicsReaderImpl<?>)
reader).getMultiTopicsConsumer();
+ } else if (reader instanceof ReaderImpl) {
+ return ((ReaderImpl<?>) reader).getConsumer();
+ } else {
+ return null;
+ }
}
public String getSubscription() {
diff --git
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
index 0d2a13d1a74..7dfa8b6e314 100644
---
a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
+++
b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java
@@ -74,6 +74,8 @@ public class ReaderHandlerTest {
ReaderHandler readerHandler = new ReaderHandler(wss, request,
servletUpgradeResponse);
// verify success
Assert.assertEquals(readerHandler.getSubscription(), subName);
+ // Verify consumer is returned
+ readerHandler.getConsumer();
}
@Test
@@ -102,6 +104,8 @@ public class ReaderHandlerTest {
ReaderHandler readerHandler = new ReaderHandler(wss, request,
servletUpgradeResponse);
// verify success
Assert.assertEquals(readerHandler.getSubscription(), subName);
+ // Verify consumer is successfully returned
+ readerHandler.getConsumer();
}
@Test