merlimat closed pull request #1174: Throw ProducerBusy when producer with same 
name is already connected
URL: https://github.com/apache/incubator-pulsar/pull/1174
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 2a6a195df..908fa1a62 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -130,6 +130,8 @@ public TooManyRequestsException(String msg) {
     public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
         if (t instanceof ServerMetadataException) {
             return PulsarApi.ServerError.MetadataError;
+        } else if (t instanceof NamingException) {
+            return PulsarApi.ServerError.ProducerBusy;
         } else if (t instanceof PersistenceException) {
             return PulsarApi.ServerError.PersistenceError;
         } else if (t instanceof ConsumerBusyException) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 93ce4b5c4..3ce76d17f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -18,6 +18,12 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -26,6 +32,7 @@
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeSet;
@@ -39,6 +46,9 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
@@ -58,6 +68,7 @@
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -68,6 +79,8 @@
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -1310,4 +1323,27 @@ public void testMessageReplay() throws Exception {
         producer.close();
     }
 
+    @Test
+    public void testCreateProducerWithSameName() throws Exception {
+        String topic = 
"persistent://prop/use/ns-abc/testCreateProducerWithSameName";
+
+        ProducerConfiguration conf = new ProducerConfiguration();
+        conf.setProducerName("test-producer-a");
+
+        Producer p1 = pulsarClient.createProducer(topic, conf);
+
+        try {
+            pulsarClient.createProducer(topic, conf);
+            fail("Should have thrown ProducerBusyException");
+        } catch (ProducerBusyException e) {
+            // Expected
+        }
+
+        p1.close();
+
+        // Now p2 should succeed
+        Producer p2 = pulsarClient.createProducer(topic, conf);
+
+        p2.close();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index 678e3d56d..50c15fb02 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -27,6 +27,7 @@
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 
@@ -88,6 +89,9 @@ public String getProducerName() {
      * <p>
      * When specifying a name, it is app to the user to ensure that, for a 
given topic, the producer name is unique
      * across all Pulsar's clusters.
+     * <p>
+     * If a producer with the same name is already connected to a particular 
topic, the
+     * {@link PulsarClient#createProducer(String)} operation will fail with 
{@link ProducerBusyException}.
      *
      * @param producerName
      *            the custom name to use for the producer
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 5f98cb40c..2bcd1425f 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -131,6 +131,12 @@ public BrokerMetadataException(String msg) {
         }
     }
 
+    public static class ProducerBusyException extends PulsarClientException {
+        public ProducerBusyException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class ConsumerBusyException extends PulsarClientException {
         public ConsumerBusyException(String msg) {
             super(msg);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 85e286745..5ad4c66f5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -527,6 +527,8 @@ private PulsarClientException 
getPulsarClientException(ServerError error, String
             return new PulsarClientException.AuthenticationException(errorMsg);
         case AuthorizationError:
             return new PulsarClientException.AuthorizationException(errorMsg);
+        case ProducerBusy:
+            return new PulsarClientException.ProducerBusyException(errorMsg);
         case ConsumerBusy:
             return new PulsarClientException.ConsumerBusyException(errorMsg);
         case MetadataError:
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index eef365188..9521594a6 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -70,6 +70,7 @@ private CompressionType(int index, int value) {
     ConsumerNotFound(13, 13),
     TooManyRequests(14, 14),
     TopicTerminatedError(15, 15),
+    ProducerBusy(16, 16),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -88,6 +89,7 @@ private CompressionType(int index, int value) {
     public static final int ConsumerNotFound_VALUE = 13;
     public static final int TooManyRequests_VALUE = 14;
     public static final int TopicTerminatedError_VALUE = 15;
+    public static final int ProducerBusy_VALUE = 16;
     
     
     public final int getNumber() { return value; }
@@ -110,6 +112,7 @@ public static ServerError valueOf(int value) {
         case 13: return ConsumerNotFound;
         case 14: return TooManyRequests;
         case 15: return TopicTerminatedError;
+        case 16: return ProducerBusy;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index faa894f8c..d5661fa06 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -109,6 +109,8 @@ enum ServerError {
     ConsumerNotFound = 13; // Consumer not found
     TooManyRequests = 14; // Error with too many simultaneously request
     TopicTerminatedError = 15; // The topic has been terminated
+
+    ProducerBusy         = 16; // Producer with same name is already connected
 }
 
 enum AuthMethod {
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 990f2cd18..a34f041e4 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -40,6 +40,7 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.common.naming.DestinationName;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerAck;
@@ -98,14 +99,11 @@ public ProducerHandler(WebSocketService service, 
HttpServletRequest request, Ser
                         request.getRemotePort(), topic);
             }
         } catch (Exception e) {
-            log.warn("[{}:{}] Failed in creating producer on topic {}", 
request.getRemoteAddr(),
-                    request.getRemotePort(), topic, e);
-            boolean configError = e instanceof IllegalArgumentException;
-            int errorCode = configError ? HttpServletResponse.SC_BAD_REQUEST
-                    : HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
-            String errorMsg = configError ? "Invalid query-param " + 
e.getMessage() : "Failed to create producer";
+            log.warn("[{}:{}] Failed in creating producer on topic {}: {}", 
request.getRemoteAddr(),
+                    request.getRemotePort(), topic, e.getMessage());
+
             try {
-                response.sendError(errorCode, errorMsg);
+                response.sendError(getErrorCode(e), getErrorMessage(e));
             } catch (IOException e1) {
                 log.warn("[{}:{}] Failed to send error: {}", 
request.getRemoteAddr(), request.getRemotePort(),
                         e1.getMessage(), e1);
@@ -113,6 +111,24 @@ public ProducerHandler(WebSocketService service, 
HttpServletRequest request, Ser
         }
     }
 
+    private static int getErrorCode(Exception e) {
+        if (e instanceof IllegalArgumentException) {
+            return HttpServletResponse.SC_BAD_REQUEST;
+        } else if (e instanceof ProducerBusyException) {
+            return HttpServletResponse.SC_CONFLICT;
+        } else {
+            return HttpServletResponse.SC_INTERNAL_SERVER_ERROR;
+        }
+    }
+
+    private static String getErrorMessage(Exception e) {
+        if (e instanceof IllegalArgumentException) {
+            return "Invalid query params: " + e.getMessage();
+        } else {
+            return "Failed to create producer: " + e.getMessage();
+        }
+    }
+
     @Override
     public void close() throws IOException {
         if (producer != null) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to