merlimat closed pull request #1242: Change HTTP status code which WebSocket proxy returns to producer whe? URL: https://github.com/apache/incubator-pulsar/pull/1242
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index c25c947b7..3c401733a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import javax.servlet.http.HttpServletResponse; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Invocation; @@ -39,6 +40,7 @@ import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.websocket.WebSocketService; import org.apache.pulsar.websocket.service.ProxyServer; @@ -70,8 +72,12 @@ private ProxyServer proxyServer; private WebSocketService service; + private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5; + @BeforeMethod public void setup() throws Exception { + conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); + super.internalSetup(); super.producerBaseSetup(); @@ -89,6 +95,7 @@ public void setup() throws Exception { @AfterMethod protected void cleanup() throws Exception { + super.resetConfig(); super.internalCleanup(); service.close(); proxyServer.stop(); @@ -97,7 +104,7 @@ protected void cleanup() throws Exception { @Test(timeOut = 10000) public void socketTest() throws Exception { - String consumerUri = "ws://localhost:" + port + final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/my-sub1?subscriptionType=Failover"; String readerUri = "ws://localhost:" + port + "/ws/reader/persistent/my-property/use/my-ns/my-topic1"; String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/my-property/use/my-ns/my-topic1/"; @@ -167,32 +174,16 @@ public void socketTest() throws Exception { } Assert.assertEquals(produceSocket.getBuffer(), readSocket.getBuffer()); } finally { - ExecutorService executor = newFixedThreadPool(1); - try { - executor.submit(() -> { - try { - consumeClient1.stop(); - consumeClient2.stop(); - readClient.stop(); - produceClient.stop(); - log.info("proxy clients are stopped successfully"); - } catch (Exception e) { - log.error(e.getMessage()); - } - }).get(2, TimeUnit.SECONDS); - } catch (Exception e) { - log.error("failed to close clients ", e); - } - executor.shutdownNow(); + stopWebSocketClient(consumeClient1, consumeClient2, readClient, produceClient); } } @Test(timeOut = 10000) - public void badConsumerTest() throws Exception { + public void emptySubcriptionConsumerTest() throws Exception { // Empty subcription name - String consumerUri = "ws://localhost:" + port - + "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/?subscriptionType=Exclusive"; + final String consumerUri = "ws://localhost:" + port + + "/ws/consumer/persistent/my-property/use/my-ns/my-topic2/?subscriptionType=Exclusive"; URI consumeUri = URI.create(consumerUri); WebSocketClient consumeClient1 = new WebSocketClient(); @@ -207,21 +198,148 @@ public void badConsumerTest() throws Exception { } catch (Exception e) { // Expected Assert.assertTrue(e.getCause() instanceof UpgradeException); + Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), + HttpServletResponse.SC_BAD_REQUEST); } finally { - ExecutorService executor = newFixedThreadPool(1); + stopWebSocketClient(consumeClient1); + } + } + + @Test(timeOut = 10000) + public void conflictingConsumerTest() throws Exception { + final String consumerUri = "ws://localhost:" + port + + "/ws/consumer/persistent/my-property/use/my-ns/my-topic3/sub1?subscriptionType=Exclusive"; + URI consumeUri = URI.create(consumerUri); + + WebSocketClient consumeClient1 = new WebSocketClient(); + WebSocketClient consumeClient2 = new WebSocketClient(); + SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket(); + SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket(); + + try { + consumeClient1.start(); + ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest(); + Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1); + consumerFuture1.get(); + try { - executor.submit(() -> { - try { - consumeClient1.stop(); - log.info("proxy clients are stopped successfully"); - } catch (Exception e) { - log.error(e.getMessage()); - } - }).get(2, TimeUnit.SECONDS); + consumeClient2.start(); + ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest(); + Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2); + consumerFuture2.get(); + Assert.fail("should fail: conflicting subscription name"); } catch (Exception e) { - log.error("failed to close clients ", e); + // Expected + Assert.assertTrue(e.getCause() instanceof UpgradeException); + Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), + HttpServletResponse.SC_CONFLICT); + } finally { + stopWebSocketClient(consumeClient2); } - executor.shutdownNow(); + } finally { + stopWebSocketClient(consumeClient1); + } + } + + @Test(timeOut = 10000) + public void conflictingProducerTest() throws Exception { + final String producerUri = "ws://localhost:" + port + + "/ws/producer/persistent/my-property/use/my-ns/my-topic4?producerName=my-producer"; + URI produceUri = URI.create(producerUri); + + WebSocketClient produceClient1 = new WebSocketClient(); + WebSocketClient produceClient2 = new WebSocketClient(); + SimpleProducerSocket produceSocket1 = new SimpleProducerSocket(); + SimpleProducerSocket produceSocket2 = new SimpleProducerSocket(); + + try { + produceClient1.start(); + ClientUpgradeRequest produceRequest1 = new ClientUpgradeRequest(); + Future<Session> producerFuture1 = produceClient1.connect(produceSocket1, produceUri, produceRequest1); + producerFuture1.get(); + + try { + produceClient2.start(); + ClientUpgradeRequest produceRequest2 = new ClientUpgradeRequest(); + Future<Session> producerFuture2 = produceClient2.connect(produceSocket2, produceUri, produceRequest2); + producerFuture2.get(); + Assert.fail("should fail: conflicting producer name"); + } catch (Exception e) { + // Expected + Assert.assertTrue(e.getCause() instanceof UpgradeException); + Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), + HttpServletResponse.SC_CONFLICT); + } finally { + stopWebSocketClient(produceClient2); + } + } finally { + stopWebSocketClient(produceClient1); + } + } + + @Test(timeOut = 30000) + public void producerBacklogQuotaExceededTest() throws Exception { + admin.namespaces().createNamespace("my-property/use/ns-ws-quota"); + admin.namespaces().setBacklogQuota("my-property/use/ns-ws-quota", + new BacklogQuota(10, BacklogQuota.RetentionPolicy.producer_request_hold)); + + final String topic = "my-property/use/ns-ws-quota/my-topic5"; + final String subscription = "my-sub"; + final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/" + subscription; + final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic; + + URI consumeUri = URI.create(consumerUri); + URI produceUri = URI.create(producerUri); + + WebSocketClient consumeClient = new WebSocketClient(); + WebSocketClient produceClient1 = new WebSocketClient(); + WebSocketClient produceClient2 = new WebSocketClient(); + + SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + SimpleProducerSocket produceSocket1 = new SimpleProducerSocket(); + SimpleProducerSocket produceSocket2 = new SimpleProducerSocket(); + + // Create subscription + try { + consumeClient.start(); + ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); + Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); + consumerFuture.get(); + } finally { + stopWebSocketClient(consumeClient); + } + + // Fill the backlog + try { + produceClient1.start(); + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + Future<Session> producerFuture = produceClient1.connect(produceSocket1, produceUri, produceRequest); + producerFuture.get(); + produceSocket1.sendMessage(100); + } finally { + stopWebSocketClient(produceClient1); + } + + Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000); + + // New producer fails to connect + try { + produceClient2.start(); + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + Future<Session> producerFuture = produceClient2.connect(produceSocket2, produceUri, produceRequest); + producerFuture.get(); + Assert.fail("should fail: backlog quota exceeded"); + } catch (Exception e) { + // Expected + Assert.assertTrue(e.getCause() instanceof UpgradeException); + Assert.assertEquals(((UpgradeException) e.getCause()).getResponseStatusCode(), + HttpServletResponse.SC_SERVICE_UNAVAILABLE); + } finally { + stopWebSocketClient(produceClient2); + admin.persistentTopics().skipAllMessages("persistent://" + topic, subscription); + admin.persistentTopics().delete("persistent://" + topic); + admin.namespaces().removeBacklogQuota("my-property/use/ns-ws-quota"); + admin.namespaces().deleteNamespace("my-property/use/ns-ws-quota"); } } @@ -232,7 +350,7 @@ public void badConsumerTest() throws Exception { */ @Test(timeOut = 10000) public void testProxyStats() throws Exception { - final String topic = "my-property/use/my-ns/my-topic2"; + final String topic = "my-property/use/my-ns/my-topic6"; final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/my-sub?subscriptionType=Failover"; final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic + "/"; @@ -299,9 +417,7 @@ public void testProxyStats() throws Exception { verifyTopicStat(client, baseUrl, topic); } finally { - consumeClient1.stop(); - produceClient.stop(); - log.info("proxy clients are stopped successfully"); + stopWebSocketClient(consumeClient1, produceClient); } } @@ -361,5 +477,23 @@ private void verifyProxyStats(Client client, String baseUrl, String topic) { Assert.assertNotNull(producerStats.remoteConnection); } + private void stopWebSocketClient(WebSocketClient... clients) { + ExecutorService executor = newFixedThreadPool(1); + try { + executor.submit(() -> { + try { + for (WebSocketClient client : clients) { + client.stop(); + } + log.info("proxy clients are stopped successfully"); + } catch (Exception e) { + log.error(e.getMessage()); + } + }).get(2, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("failed to close proxy clients", e); + } + } + private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java index 91e62b72f..ad6f91042 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java @@ -48,7 +48,6 @@ protected final String topic; protected final Map<String, String> queryParams; - protected final boolean authResult; public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) { this.service = service; @@ -59,11 +58,9 @@ public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest req request.getParameterMap().forEach((key, values) -> { queryParams.put(key, values[0]); }); - - authResult = checkAuth(response); } - private boolean checkAuth(ServletUpgradeResponse response) { + protected boolean checkAuth(ServletUpgradeResponse response) { String authRole = "<none>"; AuthenticationDataSource authenticationData = new AuthenticationDataHttps(request); if (service.isAuthenticationEnabled()) { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index b392e0ca7..6edcf2a4a 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.util.DateFormatter; @@ -64,7 +65,7 @@ */ public class ConsumerHandler extends AbstractWebSocketHandler { - private final String subscription; + private String subscription = null; private final ConsumerConfiguration conf; private Consumer consumer; @@ -80,18 +81,19 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) { super(service, request, response); - this.subscription = extractSubscription(request); this.conf = getConsumerConfiguration(); this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : conf.getReceiverQueueSize(); this.numMsgsDelivered = new LongAdder(); this.numBytesDelivered = new LongAdder(); this.numMsgsAcked = new LongAdder(); - if (!authResult) { - return; - } - try { + // checkAuth() should be called after assigning a value to this.subscription + this.subscription = extractSubscription(request); + if (!checkAuth(response)) { + return; + } + this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf); if (!this.service.addConsumer(this)) { log.warn("[{}:{}] Failed to add consumer handler for topic {}", request.getRemoteAddr(), @@ -100,12 +102,9 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser } catch (Exception e) { log.warn("[{}:{}] Failed in creating subscription {} on topic {}", request.getRemoteAddr(), request.getRemotePort(), subscription, topic, e); - boolean configError = e instanceof IllegalArgumentException; - int errorCode = configError ? HttpServletResponse.SC_BAD_REQUEST - : HttpServletResponse.SC_INTERNAL_SERVER_ERROR; - String errorMsg = configError ? "Invalid query-param " + e.getMessage() : "Failed to subscribe"; + try { - response.sendError(errorCode, errorMsg); + response.sendError(getErrorCode(e), getErrorMessage(e)); } catch (IOException e1) { log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), e1.getMessage(), e1); @@ -113,6 +112,24 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser } } + private static int getErrorCode(Exception e) { + if (e instanceof IllegalArgumentException) { + return HttpServletResponse.SC_BAD_REQUEST; + } else if (e instanceof ConsumerBusyException) { + return HttpServletResponse.SC_CONFLICT; + } else { + return HttpServletResponse.SC_INTERNAL_SERVER_ERROR; + } + } + + private static String getErrorMessage(Exception e) { + if (e instanceof IllegalArgumentException) { + return "Invalid query params: " + e.getMessage(); + } else { + return "Failed to subscribe: " + e.getMessage(); + } + } + private void receiveMessage() { if (log.isDebugEnabled()) { log.debug("[{}:{}] [{}] [{}] Receive next message", request.getRemoteAddr(), request.getRemotePort(), topic, subscription); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java index cb2e288a4..7df14a558 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java @@ -41,6 +41,8 @@ import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme; import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError; +import org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException; import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -85,7 +87,7 @@ public ProducerHandler(WebSocketService service, HttpServletRequest request, Ser this.numMsgsFailed = new LongAdder(); this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC); - if (!authResult) { + if (!checkAuth(response)) { return; } @@ -114,6 +116,8 @@ private static int getErrorCode(Exception e) { return HttpServletResponse.SC_BAD_REQUEST; } else if (e instanceof ProducerBusyException) { return HttpServletResponse.SC_CONFLICT; + } else if (e instanceof ProducerBlockedQuotaExceededError || e instanceof ProducerBlockedQuotaExceededException) { + return HttpServletResponse.SC_SERVICE_UNAVAILABLE; } else { return HttpServletResponse.SC_INTERNAL_SERVER_ERROR; } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java index 4d6c27185..d643df2b6 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java @@ -82,7 +82,7 @@ public ReaderHandler(WebSocketService service, HttpServletRequest request, Servl this.numMsgsDelivered = new LongAdder(); this.numBytesDelivered = new LongAdder(); - if (!authResult) { + if (!checkAuth(response)) { return; } @@ -97,7 +97,8 @@ public ReaderHandler(WebSocketService service, HttpServletRequest request, Servl log.warn("[{}:{}] Failed in creating reader {} on topic {}", request.getRemoteAddr(), request.getRemotePort(), subscription, topic, e); try { - response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to create reader"); + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, + "Failed to create reader: " + e.getMessage()); } catch (IOException e1) { log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), e1.getMessage(), e1); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services