merlimat closed pull request #1242: Change HTTP status code which WebSocket 
proxy returns to producer whe?
URL: https://github.com/apache/incubator-pulsar/pull/1242
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index c25c947b7..3c401733a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -30,6 +30,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.client.Invocation;
@@ -39,6 +40,7 @@
 
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.websocket.WebSocketService;
 import org.apache.pulsar.websocket.service.ProxyServer;
@@ -70,8 +72,12 @@
     private ProxyServer proxyServer;
     private WebSocketService service;
 
+    private static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
+
     @BeforeMethod
     public void setup() throws Exception {
+        
conf.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+
         super.internalSetup();
         super.producerBaseSetup();
 
@@ -89,6 +95,7 @@ public void setup() throws Exception {
 
     @AfterMethod
     protected void cleanup() throws Exception {
+        super.resetConfig();
         super.internalCleanup();
         service.close();
         proxyServer.stop();
@@ -97,7 +104,7 @@ protected void cleanup() throws Exception {
 
     @Test(timeOut = 10000)
     public void socketTest() throws Exception {
-        String consumerUri = "ws://localhost:" + port
+        final String consumerUri = "ws://localhost:" + port
                 + 
"/ws/consumer/persistent/my-property/use/my-ns/my-topic1/my-sub1?subscriptionType=Failover";
         String readerUri = "ws://localhost:" + port + 
"/ws/reader/persistent/my-property/use/my-ns/my-topic1";
         String producerUri = "ws://localhost:" + port + 
"/ws/producer/persistent/my-property/use/my-ns/my-topic1/";
@@ -167,32 +174,16 @@ public void socketTest() throws Exception {
             }
             Assert.assertEquals(produceSocket.getBuffer(), 
readSocket.getBuffer());
         } finally {
-            ExecutorService executor = newFixedThreadPool(1);
-            try {
-                executor.submit(() -> {
-                    try {
-                        consumeClient1.stop();
-                        consumeClient2.stop();
-                        readClient.stop();
-                        produceClient.stop();
-                        log.info("proxy clients are stopped successfully");
-                    } catch (Exception e) {
-                        log.error(e.getMessage());
-                    }
-                }).get(2, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                log.error("failed to close clients ", e);
-            }
-            executor.shutdownNow();
+            stopWebSocketClient(consumeClient1, consumeClient2, readClient, 
produceClient);
         }
     }
 
     @Test(timeOut = 10000)
-    public void badConsumerTest() throws Exception {
+    public void emptySubcriptionConsumerTest() throws Exception {
 
         // Empty subcription name
-        String consumerUri = "ws://localhost:" + port
-                + 
"/ws/consumer/persistent/my-property/use/my-ns/my-topic1/?subscriptionType=Exclusive";
+        final String consumerUri = "ws://localhost:" + port
+                + 
"/ws/consumer/persistent/my-property/use/my-ns/my-topic2/?subscriptionType=Exclusive";
         URI consumeUri = URI.create(consumerUri);
 
         WebSocketClient consumeClient1 = new WebSocketClient();
@@ -207,21 +198,148 @@ public void badConsumerTest() throws Exception {
         } catch (Exception e) {
             // Expected
             Assert.assertTrue(e.getCause() instanceof UpgradeException);
+            Assert.assertEquals(((UpgradeException) 
e.getCause()).getResponseStatusCode(),
+                    HttpServletResponse.SC_BAD_REQUEST);
         } finally {
-            ExecutorService executor = newFixedThreadPool(1);
+            stopWebSocketClient(consumeClient1);
+        }
+    }
+
+    @Test(timeOut = 10000)
+    public void conflictingConsumerTest() throws Exception {
+        final String consumerUri = "ws://localhost:" + port
+                + 
"/ws/consumer/persistent/my-property/use/my-ns/my-topic3/sub1?subscriptionType=Exclusive";
+        URI consumeUri = URI.create(consumerUri);
+
+        WebSocketClient consumeClient1 = new WebSocketClient();
+        WebSocketClient consumeClient2 = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
+        SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
+
+        try {
+            consumeClient1.start();
+            ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
+            Future<Session> consumerFuture1 = 
consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
+            consumerFuture1.get();
+
             try {
-                executor.submit(() -> {
-                    try {
-                        consumeClient1.stop();
-                        log.info("proxy clients are stopped successfully");
-                    } catch (Exception e) {
-                        log.error(e.getMessage());
-                    }
-                }).get(2, TimeUnit.SECONDS);
+                consumeClient2.start();
+                ClientUpgradeRequest consumeRequest2 = new 
ClientUpgradeRequest();
+                Future<Session> consumerFuture2 = 
consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
+                consumerFuture2.get();
+                Assert.fail("should fail: conflicting subscription name");
             } catch (Exception e) {
-                log.error("failed to close clients ", e);
+                // Expected
+                Assert.assertTrue(e.getCause() instanceof UpgradeException);
+                Assert.assertEquals(((UpgradeException) 
e.getCause()).getResponseStatusCode(),
+                        HttpServletResponse.SC_CONFLICT);
+            } finally {
+                stopWebSocketClient(consumeClient2);
             }
-            executor.shutdownNow();
+        } finally {
+            stopWebSocketClient(consumeClient1);
+        }
+    }
+
+    @Test(timeOut = 10000)
+    public void conflictingProducerTest() throws Exception {
+        final String producerUri = "ws://localhost:" + port
+                + 
"/ws/producer/persistent/my-property/use/my-ns/my-topic4?producerName=my-producer";
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient produceClient1 = new WebSocketClient();
+        WebSocketClient produceClient2 = new WebSocketClient();
+        SimpleProducerSocket produceSocket1 = new SimpleProducerSocket();
+        SimpleProducerSocket produceSocket2 = new SimpleProducerSocket();
+
+        try {
+            produceClient1.start();
+            ClientUpgradeRequest produceRequest1 = new ClientUpgradeRequest();
+            Future<Session> producerFuture1 = 
produceClient1.connect(produceSocket1, produceUri, produceRequest1);
+            producerFuture1.get();
+
+            try {
+                produceClient2.start();
+                ClientUpgradeRequest produceRequest2 = new 
ClientUpgradeRequest();
+                Future<Session> producerFuture2 = 
produceClient2.connect(produceSocket2, produceUri, produceRequest2);
+                producerFuture2.get();
+                Assert.fail("should fail: conflicting producer name");
+            } catch (Exception e) {
+                // Expected
+                Assert.assertTrue(e.getCause() instanceof UpgradeException);
+                Assert.assertEquals(((UpgradeException) 
e.getCause()).getResponseStatusCode(),
+                        HttpServletResponse.SC_CONFLICT);
+            } finally {
+                stopWebSocketClient(produceClient2);
+            }
+        } finally {
+            stopWebSocketClient(produceClient1);
+        }
+    }
+
+    @Test(timeOut = 30000)
+    public void producerBacklogQuotaExceededTest() throws Exception {
+        admin.namespaces().createNamespace("my-property/use/ns-ws-quota");
+        admin.namespaces().setBacklogQuota("my-property/use/ns-ws-quota",
+                new BacklogQuota(10, 
BacklogQuota.RetentionPolicy.producer_request_hold));
+
+        final String topic = "my-property/use/ns-ws-quota/my-topic5";
+        final String subscription = "my-sub";
+        final String consumerUri = "ws://localhost:" + port + 
"/ws/consumer/persistent/" + topic + "/" + subscription;
+        final String producerUri = "ws://localhost:" + port + 
"/ws/producer/persistent/" + topic;
+
+        URI consumeUri = URI.create(consumerUri);
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient consumeClient = new WebSocketClient();
+        WebSocketClient produceClient1 = new WebSocketClient();
+        WebSocketClient produceClient2 = new WebSocketClient();
+
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+        SimpleProducerSocket produceSocket1 = new SimpleProducerSocket();
+        SimpleProducerSocket produceSocket2 = new SimpleProducerSocket();
+
+        // Create subscription
+        try {
+            consumeClient.start();
+            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+            Future<Session> consumerFuture = 
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+            consumerFuture.get();
+        } finally {
+            stopWebSocketClient(consumeClient);
+        }
+
+        // Fill the backlog
+        try {
+            produceClient1.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = 
produceClient1.connect(produceSocket1, produceUri, produceRequest);
+            producerFuture.get();
+            produceSocket1.sendMessage(100);
+        } finally {
+            stopWebSocketClient(produceClient1);
+        }
+
+        Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA + 1) * 1000);
+
+        // New producer fails to connect
+        try {
+            produceClient2.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = 
produceClient2.connect(produceSocket2, produceUri, produceRequest);
+            producerFuture.get();
+            Assert.fail("should fail: backlog quota exceeded");
+        } catch (Exception e) {
+            // Expected
+            Assert.assertTrue(e.getCause() instanceof UpgradeException);
+            Assert.assertEquals(((UpgradeException) 
e.getCause()).getResponseStatusCode(),
+                    HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+        } finally {
+            stopWebSocketClient(produceClient2);
+            admin.persistentTopics().skipAllMessages("persistent://" + topic, 
subscription);
+            admin.persistentTopics().delete("persistent://" + topic);
+            
admin.namespaces().removeBacklogQuota("my-property/use/ns-ws-quota");
+            admin.namespaces().deleteNamespace("my-property/use/ns-ws-quota");
         }
     }
 
@@ -232,7 +350,7 @@ public void badConsumerTest() throws Exception {
      */
     @Test(timeOut = 10000)
     public void testProxyStats() throws Exception {
-        final String topic = "my-property/use/my-ns/my-topic2";
+        final String topic = "my-property/use/my-ns/my-topic6";
         final String consumerUri = "ws://localhost:" + port + 
"/ws/consumer/persistent/" + topic
                 + "/my-sub?subscriptionType=Failover";
         final String producerUri = "ws://localhost:" + port + 
"/ws/producer/persistent/" + topic + "/";
@@ -299,9 +417,7 @@ public void testProxyStats() throws Exception {
             verifyTopicStat(client, baseUrl, topic);
 
         } finally {
-            consumeClient1.stop();
-            produceClient.stop();
-            log.info("proxy clients are stopped successfully");
+            stopWebSocketClient(consumeClient1, produceClient);
         }
     }
 
@@ -361,5 +477,23 @@ private void verifyProxyStats(Client client, String 
baseUrl, String topic) {
         Assert.assertNotNull(producerStats.remoteConnection);
     }
 
+    private void stopWebSocketClient(WebSocketClient... clients) {
+        ExecutorService executor = newFixedThreadPool(1);
+        try {
+            executor.submit(() -> {
+                try {
+                    for (WebSocketClient client : clients) {
+                        client.stop();
+                    }
+                    log.info("proxy clients are stopped successfully");
+                } catch (Exception e) {
+                    log.error(e.getMessage());
+                }
+            }).get(2, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            log.error("failed to close proxy clients", e);
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(ProxyPublishConsumeTest.class);
 }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
index 91e62b72f..ad6f91042 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java
@@ -48,7 +48,6 @@
 
     protected final String topic;
     protected final Map<String, String> queryParams;
-    protected final boolean authResult;
 
     public AbstractWebSocketHandler(WebSocketService service, 
HttpServletRequest request, ServletUpgradeResponse response) {
         this.service = service;
@@ -59,11 +58,9 @@ public AbstractWebSocketHandler(WebSocketService service, 
HttpServletRequest req
         request.getParameterMap().forEach((key, values) -> {
             queryParams.put(key, values[0]);
         });
-
-        authResult = checkAuth(response);
     }
 
-    private boolean checkAuth(ServletUpgradeResponse response) {
+    protected boolean checkAuth(ServletUpgradeResponse response) {
         String authRole = "<none>";
         AuthenticationDataSource authenticationData = new 
AuthenticationDataHttps(request);
         if (service.isAuthenticationEnabled()) {
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index b392e0ca7..6edcf2a4a 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -35,6 +35,7 @@
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.MessageId;
+import 
org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.util.DateFormatter;
@@ -64,7 +65,7 @@
  */
 public class ConsumerHandler extends AbstractWebSocketHandler {
 
-    private final String subscription;
+    private String subscription = null;
     private final ConsumerConfiguration conf;
     private Consumer consumer;
 
@@ -80,18 +81,19 @@
 
     public ConsumerHandler(WebSocketService service, HttpServletRequest 
request, ServletUpgradeResponse response) {
         super(service, request, response);
-        this.subscription = extractSubscription(request);
         this.conf = getConsumerConfiguration();
         this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : 
conf.getReceiverQueueSize();
         this.numMsgsDelivered = new LongAdder();
         this.numBytesDelivered = new LongAdder();
         this.numMsgsAcked = new LongAdder();
 
-        if (!authResult) {
-            return;
-        }
-
         try {
+            // checkAuth() should be called after assigning a value to 
this.subscription
+            this.subscription = extractSubscription(request);
+            if (!checkAuth(response)) {
+                return;
+            }
+
             this.consumer = service.getPulsarClient().subscribe(topic, 
subscription, conf);
             if (!this.service.addConsumer(this)) {
                 log.warn("[{}:{}] Failed to add consumer handler for topic 
{}", request.getRemoteAddr(),
@@ -100,12 +102,9 @@ public ConsumerHandler(WebSocketService service, 
HttpServletRequest request, Ser
         } catch (Exception e) {
             log.warn("[{}:{}] Failed in creating subscription {} on topic {}", 
request.getRemoteAddr(),
                     request.getRemotePort(), subscription, topic, e);
-            boolean configError = e instanceof IllegalArgumentException;
-            int errorCode = configError ? HttpServletResponse.SC_BAD_REQUEST
-                    : HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
-            String errorMsg = configError ? "Invalid query-param " + 
e.getMessage() : "Failed to subscribe";
+
             try {
-                response.sendError(errorCode, errorMsg);
+                response.sendError(getErrorCode(e), getErrorMessage(e));
             } catch (IOException e1) {
                 log.warn("[{}:{}] Failed to send error: {}", 
request.getRemoteAddr(), request.getRemotePort(),
                         e1.getMessage(), e1);
@@ -113,6 +112,24 @@ public ConsumerHandler(WebSocketService service, 
HttpServletRequest request, Ser
         }
     }
 
+    private static int getErrorCode(Exception e) {
+        if (e instanceof IllegalArgumentException) {
+            return HttpServletResponse.SC_BAD_REQUEST;
+        } else if (e instanceof ConsumerBusyException) {
+            return HttpServletResponse.SC_CONFLICT;
+        } else {
+            return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+        }
+    }
+
+    private static String getErrorMessage(Exception e) {
+        if (e instanceof IllegalArgumentException) {
+            return "Invalid query params: " + e.getMessage();
+        } else {
+            return "Failed to subscribe: " + e.getMessage();
+        }
+    }
+
     private void receiveMessage() {
         if (log.isDebugEnabled()) {
             log.debug("[{}:{}] [{}] [{}] Receive next message", 
request.getRemoteAddr(), request.getRemotePort(), topic, subscription);
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index cb2e288a4..7df14a558 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -41,6 +41,8 @@
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme;
 import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededError;
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBlockedQuotaExceededException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -85,7 +87,7 @@ public ProducerHandler(WebSocketService service, 
HttpServletRequest request, Ser
         this.numMsgsFailed = new LongAdder();
         this.publishLatencyStatsUSec = new 
StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
 
-        if (!authResult) {
+        if (!checkAuth(response)) {
             return;
         }
 
@@ -114,6 +116,8 @@ private static int getErrorCode(Exception e) {
             return HttpServletResponse.SC_BAD_REQUEST;
         } else if (e instanceof ProducerBusyException) {
             return HttpServletResponse.SC_CONFLICT;
+        } else if (e instanceof ProducerBlockedQuotaExceededError || e 
instanceof ProducerBlockedQuotaExceededException) {
+            return HttpServletResponse.SC_SERVICE_UNAVAILABLE;
         } else {
             return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
         }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 4d6c27185..d643df2b6 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -82,7 +82,7 @@ public ReaderHandler(WebSocketService service, 
HttpServletRequest request, Servl
         this.numMsgsDelivered = new LongAdder();
         this.numBytesDelivered = new LongAdder();
 
-        if (!authResult) {
+        if (!checkAuth(response)) {
             return;
         }
 
@@ -97,7 +97,8 @@ public ReaderHandler(WebSocketService service, 
HttpServletRequest request, Servl
             log.warn("[{}:{}] Failed in creating reader {} on topic {}", 
request.getRemoteAddr(),
                     request.getRemotePort(), subscription, topic, e);
             try {
-                
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to 
create reader");
+                
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
+                        "Failed to create reader: " + e.getMessage());
             } catch (IOException e1) {
                 log.warn("[{}:{}] Failed to send error: {}", 
request.getRemoteAddr(), request.getRemotePort(),
                         e1.getMessage(), e1);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to