This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit edc242da6a95e3c460393a9005dfbfb755afc42c Author: Pratik Katti <[email protected]> AuthorDate: Mon May 25 20:58:25 2026 +0530 Return 400 for invalid reader messageId query parameter (#25865) (cherry picked from commit b406518fd2633c842d63d31ea16fe94e015ccd88) --- .../org/apache/pulsar/websocket/ReaderHandler.java | 36 ++++++-- .../apache/pulsar/websocket/ReaderHandlerTest.java | 97 +++++++++++++++++++++- 2 files changed, 121 insertions(+), 12 deletions(-) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java index 7cee6005f05..c4bb952c628 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java @@ -121,11 +121,17 @@ public class ReaderHandler extends AbstractWebSocketHandler { } allowConnect = true; } catch (Exception e) { - log.warn("[{}:{}] Failed in creating reader {} on topic {}", request.getRemoteAddr(), - request.getRemotePort(), subscription, topic, e); + int errorCode = getErrorCode(e); + boolean isKnownError = errorCode != HttpServletResponse.SC_INTERNAL_SERVER_ERROR; + if (isKnownError) { + log.warn("[{}:{}] Failed in creating reader {} on topic {}: {}", request.getRemoteAddr(), + request.getRemotePort(), subscription, topic, e.getMessage()); + } else { + log.error("[{}:{}] Failed in creating reader {} on topic {}", request.getRemoteAddr(), + request.getRemotePort(), subscription, topic, e); + } try { - response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, - "Failed to create reader: " + e.getMessage()); + response.sendError(errorCode, getErrorMessage(e)); } catch (IOException e1) { log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), e1.getMessage(), e1); @@ -340,13 +346,25 @@ public class ReaderHandler extends AbstractWebSocketHandler { return size; } - private MessageId getMessageId() throws IOException { + private MessageId getMessageId() { MessageId messageId = MessageId.latest; - if (isNotBlank(queryParams.get("messageId"))) { - if (queryParams.get("messageId").equals("earliest")) { + String messageIdParam = queryParams.get("messageId"); + if (isNotBlank(messageIdParam)) { + if (messageIdParam.equals("earliest")) { messageId = MessageId.earliest; - } else if (!queryParams.get("messageId").equals("latest")) { - messageId = MessageIdImpl.fromByteArray(Base64.getDecoder().decode(queryParams.get("messageId"))); + } else if (!messageIdParam.equals("latest")) { + final byte[] decoded; + try { + decoded = Base64.getDecoder().decode(messageIdParam); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid messageId base64 value", e); + } + + try { + messageId = MessageIdImpl.fromByteArray(decoded); + } catch (IOException | RuntimeException e) { + throw new IllegalArgumentException("Invalid messageId value", e); + } } } return messageId; diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java index a79899ab6fa..9db1561ec5a 100644 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/ReaderHandlerTest.java @@ -21,17 +21,22 @@ package org.apache.pulsar.websocket; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.IOException; +import java.util.Base64; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; @@ -43,12 +48,98 @@ import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsReaderImpl; import org.apache.pulsar.client.impl.ReaderImpl; +import org.apache.pulsar.common.api.proto.MessageIdData; import org.eclipse.jetty.ee8.websocket.server.JettyServerUpgradeResponse; import org.testng.Assert; import org.testng.annotations.Test; public class ReaderHandlerTest { + @Test + @SuppressWarnings("unchecked") + public void testInvalidMessageIdBase64ReturnsBadRequest() throws IOException { + WebSocketService wss = mock(WebSocketService.class); + PulsarClient mockedClient = mock(PulsarClient.class); + when(wss.getPulsarClient()).thenReturn(mockedClient); + ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class); + when(mockedClient.newReader()).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder); + // Ensure the chain doesn't NPE after startMessageId() if parsing unexpectedly succeeds. + when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder); + + Map<String, String[]> params = new HashMap<>(); + params.put("messageId", new String[] { "invalidMessageId" }); + + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic"); + when(request.getParameterMap()).thenReturn(params); + + JettyServerUpgradeResponse servletUpgradeResponse = mock(JettyServerUpgradeResponse.class); + new ReaderHandler(wss, request, servletUpgradeResponse); + + verify(servletUpgradeResponse, times(1)) + .sendError(eq(HttpServletResponse.SC_BAD_REQUEST), anyString()); + } + + @Test + @SuppressWarnings("unchecked") + public void testInvalidMessageIdBytesReturnsBadRequest() throws IOException { + WebSocketService wss = mock(WebSocketService.class); + PulsarClient mockedClient = mock(PulsarClient.class); + when(wss.getPulsarClient()).thenReturn(mockedClient); + ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class); + when(mockedClient.newReader()).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder); + // Ensure the chain doesn't NPE after startMessageId() if parsing unexpectedly succeeds. + when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder); + + // "AQID" is valid Base64, but it doesn't decode into a valid Pulsar MessageId structure. + Map<String, String[]> params = new HashMap<>(); + params.put("messageId", new String[] { "AQID" }); + + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic"); + when(request.getParameterMap()).thenReturn(params); + + JettyServerUpgradeResponse servletUpgradeResponse = mock(JettyServerUpgradeResponse.class); + new ReaderHandler(wss, request, servletUpgradeResponse); + + verify(servletUpgradeResponse, times(1)) + .sendError(eq(HttpServletResponse.SC_BAD_REQUEST), anyString()); + } + + @Test + @SuppressWarnings("unchecked") + public void testInvalidMessageIdRuntimeParseFailureReturnsBadRequest() throws IOException { + WebSocketService wss = mock(WebSocketService.class); + PulsarClient mockedClient = mock(PulsarClient.class); + when(wss.getPulsarClient()).thenReturn(mockedClient); + ReaderBuilder<byte[]> mockedReaderBuilder = mock(ReaderBuilder.class); + when(mockedClient.newReader()).thenReturn(mockedReaderBuilder); + when(mockedReaderBuilder.topic(any())).thenReturn(mockedReaderBuilder); + // Ensure the chain doesn't NPE after startMessageId() if parsing unexpectedly succeeds. + when(mockedReaderBuilder.startMessageId(any())).thenReturn(mockedReaderBuilder); + + MessageIdData invalidBatchMessageId = new MessageIdData() + .setLedgerId(1) + .setEntryId(2) + .setBatchIndex(0) + .setBatchSize(-1); + Map<String, String[]> params = new HashMap<>(); + params.put("messageId", new String[] { + Base64.getEncoder().encodeToString(invalidBatchMessageId.toByteArray()) }); + + HttpServletRequest request = mock(HttpServletRequest.class); + when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic"); + when(request.getParameterMap()).thenReturn(params); + + JettyServerUpgradeResponse servletUpgradeResponse = mock(JettyServerUpgradeResponse.class); + new ReaderHandler(wss, request, servletUpgradeResponse); + + verify(servletUpgradeResponse, times(1)) + .sendError(eq(HttpServletResponse.SC_BAD_REQUEST), anyString()); + } + @Test @SuppressWarnings("unchecked") public void testCreateReaderImp() throws IOException { @@ -68,7 +159,7 @@ public class ReaderHandlerTest { when(consumerImp.getSubscription()).thenReturn(subName); when(mockedReader.getConsumer()).thenReturn(consumerImp); HttpServletRequest request = mock(HttpServletRequest.class); - when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic"); + when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic"); // create reader handler JettyServerUpgradeResponse servletUpgradeResponse = mock(JettyServerUpgradeResponse.class); ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse); @@ -97,7 +188,7 @@ public class ReaderHandlerTest { when(consumerImp.getSubscription()).thenReturn(subName); when(mockedReader.getMultiTopicsConsumer()).thenReturn(consumerImp); HttpServletRequest request = mock(HttpServletRequest.class); - when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic"); + when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic"); // create reader handler JettyServerUpgradeResponse servletUpgradeResponse = mock(JettyServerUpgradeResponse.class); ReaderHandler readerHandler = new ReaderHandler(wss, request, servletUpgradeResponse); @@ -122,7 +213,7 @@ public class ReaderHandlerTest { IllegalReader illegalReader = new IllegalReader(); when(mockedReaderBuilder.create()).thenReturn(illegalReader); HttpServletRequest request = mock(HttpServletRequest.class); - when(request.getRequestURI()).thenReturn("/ws/v2/producer/persistent/my-property/my-ns/my-topic"); + when(request.getRequestURI()).thenReturn("/ws/v2/reader/persistent/my-property/my-ns/my-topic"); // create reader handler JettyServerUpgradeResponse servletUpgradeResponse = spy(JettyServerUpgradeResponse.class); new ReaderHandler(wss, request, servletUpgradeResponse);
