This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d6867b44f2747c4fdb1b09e14b2a356931f49ff9 Author: Qiang Zhao <[email protected]> AuthorDate: Thu Feb 17 13:23:03 2022 +0800 [Websocket] Fix ``ClassCastException`` when user create ``MultiTopicReader``. (#14316) (cherry picked from commit 7a7cf54b01420aeac855eea91529ea13bd753e52) --- .../org/apache/pulsar/websocket/ReaderHandler.java | 10 +- .../apache/pulsar/websocket/ReaderHandlerTest.java | 214 +++++++++++++++++++++ 2 files changed, 222 insertions(+), 2 deletions(-) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java index ef0279d..2b87802 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java @@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MultiTopicsReaderImpl; import org.apache.pulsar.client.impl.ReaderImpl; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -103,8 +104,13 @@ public class ReaderHandler extends AbstractWebSocketHandler { } this.reader = builder.create(); - - this.subscription = ((ReaderImpl<?>) this.reader).getConsumer().getSubscription(); + if (reader instanceof MultiTopicsReaderImpl) { + this.subscription = ((MultiTopicsReaderImpl<?>) reader).getMultiTopicsConsumer().getSubscription(); + } else if (reader instanceof ReaderImpl) { + this.subscription = ((ReaderImpl<?>) reader).getConsumer().getSubscription(); + } else { + throw new IllegalArgumentException(String.format("Illegal Reader Type %s", reader.getClass())); + } if (!this.service.addReader(this)) { log.warn("[{}:{}] Failed to add reader handler for topic {}", request.getRemoteAddr(), request.getRemotePort(), topic); diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java new file mode 100644 index 0000000..0d2a13d --- /dev/null +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; +import org.apache.pulsar.client.impl.MultiTopicsReaderImpl; +import org.apache.pulsar.client.impl.ReaderImpl; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.testng.Assert; +import org.testng.annotations.Test; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ReaderHandlerTest { + + @Test + @SuppressWarnings("unchecked") + public void testCreateReaderImp() throws IOException { + final String subName = "readerImpSubscription"; + // mock data + WebSocketService wss = mock(WebSocketService.class); + PulsarClient mockedClient = mock(PulsarClient.class); + when(wss.getPulsarClient()).thenReturn(mockedClient); + ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class); + when(mockedClient.newReader()).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.receiverQueueSize(anyInt())).thenReturn(mockedReaderBuilder); + ReaderImpl<byte[]> mockedReader = mock(ReaderImpl.class); + when(mockedReaderBuilder.create()).thenReturn(mockedReader); + ConsumerImpl<byte[]> consumerImp = mock(ConsumerImpl.class); + when(consumerImp.getSubscription()).thenReturn(subName); + when(mockedReader.getConsumer()).thenReturn(consumerImp); + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic"); + // create reader handler + HttpServletResponse response = spy(HttpServletResponse.class); + ServletUpgradeResponse servletUpgradeResponse = new ServletUpgradeResponse(response); + ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse); + // verify success + Assert.assertEquals(readerHandler.getSubscription(), subName); + } + + @Test + @SuppressWarnings("unchecked") + public void testCreateMultipleTopicReaderImp() throws IOException { + final String subName = "multipleTopicReaderImpSubscription"; + // mock data + WebSocketService wss = mock(WebSocketService.class); + PulsarClient mockedClient = mock(PulsarClient.class); + when(wss.getPulsarClient()).thenReturn(mockedClient); + ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class); + when(mockedClient.newReader()).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.receiverQueueSize(anyInt())).thenReturn(mockedReaderBuilder); + MultiTopicsReaderImpl<byte[]> mockedReader = mock(MultiTopicsReaderImpl.class); + when(mockedReaderBuilder.create()).thenReturn(mockedReader); + MultiTopicsConsumerImpl<byte[]> consumerImp = mock(MultiTopicsConsumerImpl.class); + when(consumerImp.getSubscription()).thenReturn(subName); + when(mockedReader.getMultiTopicsConsumer()).thenReturn(consumerImp); + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic"); + // create reader handler + HttpServletResponse response = spy(HttpServletResponse.class); + ServletUpgradeResponse servletUpgradeResponse = new ServletUpgradeResponse(response); + ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse); + // verify success + Assert.assertEquals(readerHandler.getSubscription(), subName); + } + + @Test + @SuppressWarnings("unchecked") + public void testCreateIllegalReaderImp() throws IOException { + // mock data + WebSocketService wss = mock(WebSocketService.class); + PulsarClient mockedClient = mock(PulsarClient.class); + when(wss.getPulsarClient()).thenReturn(mockedClient); + ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class); + when(mockedClient.newReader()).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.receiverQueueSize(anyInt())).thenReturn(mockedReaderBuilder); + IllegalReader illegalReader = new IllegalReader(); + when(mockedReaderBuilder.create()).thenReturn(illegalReader); + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic"); + // create reader handler + HttpServletResponse response = spy(HttpServletResponse.class); + ServletUpgradeResponse servletUpgradeResponse = new ServletUpgradeResponse(response); + new ReaderHandler(wss, request, servletUpgradeResponse); + // verify get error + verify(response, times(1)).sendError(anyInt(), anyString()); + } + + + static class IllegalReader implements Reader<byte[]> { + + @Override + public String getTopic() { + return null; + } + + @Override + public Message<byte[]> readNext() throws PulsarClientException { + return null; + } + + @Override + public Message<byte[]> readNext(int timeout, TimeUnit unit) throws PulsarClientException { + return null; + } + + @Override + public CompletableFuture<Message<byte[]>> readNextAsync() { + return null; + } + + @Override + public CompletableFuture<Void> closeAsync() { + return null; + } + + @Override + public boolean hasReachedEndOfTopic() { + return false; + } + + @Override + public boolean hasMessageAvailable() { + return false; + } + + @Override + public CompletableFuture<Boolean> hasMessageAvailableAsync() { + return null; + } + + @Override + public boolean isConnected() { + return false; + } + + @Override + public void seek(MessageId messageId) throws PulsarClientException { + + } + + @Override + public void seek(long timestamp) throws PulsarClientException { + + } + + @Override + public void seek(Function<String, Object> function) throws PulsarClientException { + + } + + @Override + public CompletableFuture<Void> seekAsync(Function<String, Object> function) { + return null; + } + + @Override + public CompletableFuture<Void> seekAsync(MessageId messageId) { + return null; + } + + @Override + public CompletableFuture<Void> seekAsync(long timestamp) { + return null; + } + + @Override + public void close() throws IOException { + + } + } +}
