This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 03ade5c Throw ProducerBusy when producer with same name is already connected (#1174) 03ade5c is described below commit 03ade5c078b85111d2a70ad4b9a3fedaa7e7a959 Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Feb 5 18:51:18 2018 -0800 Throw ProducerBusy when producer with same name is already connected (#1174) --- .../broker/service/BrokerServiceException.java | 2 ++ .../broker/service/PersistentTopicE2ETest.java | 36 ++++++++++++++++++++++ .../pulsar/client/api/ProducerConfiguration.java | 4 +++ .../pulsar/client/api/PulsarClientException.java | 6 ++++ .../org/apache/pulsar/client/impl/ClientCnx.java | 2 ++ .../apache/pulsar/common/api/proto/PulsarApi.java | 3 ++ pulsar-common/src/main/proto/PulsarApi.proto | 2 ++ .../apache/pulsar/websocket/ProducerHandler.java | 30 +++++++++++++----- 8 files changed, 78 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 1127237..a8b4f35 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -136,6 +136,8 @@ public class BrokerServiceException extends Exception { public static PulsarApi.ServerError getClientErrorCode(Throwable t) { if (t instanceof ServerMetadataException) { return PulsarApi.ServerError.MetadataError; + } else if (t instanceof NamingException) { + return PulsarApi.ServerError.ProducerBusy; } else if (t instanceof PersistenceException) { return PulsarApi.ServerError.PersistenceError; } else if (t instanceof ConsumerBusyException) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 93ce4b5..3ce76d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -18,6 +18,12 @@ */ package org.apache.pulsar.broker.service; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -26,6 +32,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeSet; @@ -39,6 +46,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -58,6 +68,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -68,6 +79,8 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -1310,4 +1323,27 @@ public class PersistentTopicE2ETest extends BrokerTestBase { producer.close(); } + @Test + public void testCreateProducerWithSameName() throws Exception { + String topic = "persistent://prop/use/ns-abc/testCreateProducerWithSameName"; + + ProducerConfiguration conf = new ProducerConfiguration(); + conf.setProducerName("test-producer-a"); + + Producer p1 = pulsarClient.createProducer(topic, conf); + + try { + pulsarClient.createProducer(topic, conf); + fail("Should have thrown ProducerBusyException"); + } catch (ProducerBusyException e) { + // Expected + } + + p1.close(); + + // Now p2 should succeed + Producer p2 = pulsarClient.createProducer(topic, conf); + + p2.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java index 9bde4c8..a6d2a55 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; @@ -88,6 +89,9 @@ public class ProducerConfiguration implements Serializable { * <p> * When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique * across all Pulsar's clusters. + * <p> + * If a producer with the same name is already connected to a particular topic, the + * {@link PulsarClient#createProducer(String)} operation will fail with {@link ProducerBusyException}. * * @param producerName * the custom name to use for the producer diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 5f98cb4..2bcd142 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -131,6 +131,12 @@ public class PulsarClientException extends IOException { } } + public static class ProducerBusyException extends PulsarClientException { + public ProducerBusyException(String msg) { + super(msg); + } + } + public static class ConsumerBusyException extends PulsarClientException { public ConsumerBusyException(String msg) { super(msg); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 85e2867..5ad4c66 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -527,6 +527,8 @@ public class ClientCnx extends PulsarHandler { return new PulsarClientException.AuthenticationException(errorMsg); case AuthorizationError: return new PulsarClientException.AuthorizationException(errorMsg); + case ProducerBusy: + return new PulsarClientException.ProducerBusyException(errorMsg); case ConsumerBusy: return new PulsarClientException.ConsumerBusyException(errorMsg); case MetadataError: diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index eef3651..9521594 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -70,6 +70,7 @@ public final class PulsarApi { ConsumerNotFound(13, 13), TooManyRequests(14, 14), TopicTerminatedError(15, 15), + ProducerBusy(16, 16), ; public static final int UnknownError_VALUE = 0; @@ -88,6 +89,7 @@ public final class PulsarApi { public static final int ConsumerNotFound_VALUE = 13; public static final int TooManyRequests_VALUE = 14; public static final int TopicTerminatedError_VALUE = 15; + public static final int ProducerBusy_VALUE = 16; public final int getNumber() { return value; } @@ -110,6 +112,7 @@ public final class PulsarApi { case 13: return ConsumerNotFound; case 14: return TooManyRequests; case 15: return TopicTerminatedError; + case 16: return ProducerBusy; default: return null; } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index faa894f..d5661fa 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -109,6 +109,8 @@ enum ServerError { ConsumerNotFound = 13; // Consumer not found TooManyRequests = 14; // Error with too many simultaneously request TopicTerminatedError = 15; // The topic has been terminated + + ProducerBusy = 16; // Producer with same name is already connected } enum AuthMethod { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java index 13758b8..e551885 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java @@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.ProducerConfiguration.HashingScheme; import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException; import org.apache.pulsar.common.naming.DestinationName; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.websocket.data.ProducerAck; @@ -99,14 +100,11 @@ public class ProducerHandler extends AbstractWebSocketHandler { request.getRemotePort(), topic); } } catch (Exception e) { - log.warn("[{}:{}] Failed in creating producer on topic {}", request.getRemoteAddr(), - request.getRemotePort(), topic, e); - boolean configError = e instanceof IllegalArgumentException; - int errorCode = configError ? HttpServletResponse.SC_BAD_REQUEST - : HttpServletResponse.SC_INTERNAL_SERVER_ERROR; - String errorMsg = configError ? "Invalid query-param " + e.getMessage() : "Failed to create producer"; + log.warn("[{}:{}] Failed in creating producer on topic {}: {}", request.getRemoteAddr(), + request.getRemotePort(), topic, e.getMessage()); + try { - response.sendError(errorCode, errorMsg); + response.sendError(getErrorCode(e), getErrorMessage(e)); } catch (IOException e1) { log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(), e1.getMessage(), e1); @@ -114,6 +112,24 @@ public class ProducerHandler extends AbstractWebSocketHandler { } } + private static int getErrorCode(Exception e) { + if (e instanceof IllegalArgumentException) { + return HttpServletResponse.SC_BAD_REQUEST; + } else if (e instanceof ProducerBusyException) { + return HttpServletResponse.SC_CONFLICT; + } else { + return HttpServletResponse.SC_INTERNAL_SERVER_ERROR; + } + } + + private static String getErrorMessage(Exception e) { + if (e instanceof IllegalArgumentException) { + return "Invalid query params: " + e.getMessage(); + } else { + return "Failed to create producer: " + e.getMessage(); + } + } + @Override public void close() throws IOException { if (producer != null) { -- To stop receiving notification emails like this one, please contact mme...@apache.org.