This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fbdf038 [pulsar-broker]Fix: client-producer can't connect due to
failed producer-future on cnx (#4138)
fbdf038 is described below
commit fbdf0383492277fbb0589a5485e2171d1f3c3568
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Fri Apr 26 09:08:17 2019 -0700
[pulsar-broker]Fix: client-producer can't connect due to failed
producer-future on cnx (#4138)
---
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 83861b1..aebcb30 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -100,10 +100,8 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
-import org.apache.pulsar.common.sasl.SaslConstants;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaInfoUtil;
-import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -851,8 +849,13 @@ public class ServerCnx extends PulsarHandler {
// until the previous producer creation
// request
// either complete or fails.
- ServerError error =
!existingProducerFuture.isDone() ? ServerError.ServiceNotReady
- : getErrorCode(existingProducerFuture);
+ ServerError error = null;
+ if(!existingProducerFuture.isDone()) {
+ error = ServerError.ServiceNotReady;
+ }else {
+ error =
getErrorCode(existingProducerFuture);
+ producers.remove(producerId,
producerFuture);
+ }
log.warn("[{}][{}] Producer with id {} is
already present on the connection", remoteAddress,
producerId, topicName);
ctx.writeAndFlush(Commands.newError(requestId,
error,
@@ -899,6 +902,8 @@ public class ServerCnx extends PulsarHandler {
schemaVersionFuture = topic.addSchema(schema);
} else {
schemaVersionFuture =
topic.hasSchema().thenCompose((hasSchema) -> {
+ log.info("[{}]-{} {} configured with
schema {}", remoteAddress, producerId,
+ topicName, hasSchema);
CompletableFuture<SchemaVersion>
result = new CompletableFuture<>();
if (hasSchema &&
schemaValidationEnforced) {
result.completeExceptionally(new
IncompatibleSchemaException(