This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f1df048  Fixed resetting of producer sequence id counter after 
receiving SendError before any message was successfully published (#2047)
f1df048 is described below

commit f1df0484d1f094d9bd48db18d28e1a1ef233f6db
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 28 15:21:25 2018 -0700

    Fixed resetting of producer sequence id counter after receiving SendError 
before any message was successfully published (#2047)
---
 .../broker/service/BrokerBkEnsemblesTests.java     |   8 +-
 .../pulsar/broker/service/RackAwareTest.java       |   4 +-
 .../client/impl/SequenceIdWithErrorTest.java       | 102 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |   5 +-
 4 files changed, 112 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 173fb49..4ecdae4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -60,11 +60,11 @@ import com.google.common.collect.Sets;
  */
 public class BrokerBkEnsemblesTests {
     protected static int BROKER_SERVICE_PORT = 16650;
-    PulsarService pulsar;
+    protected PulsarService pulsar;
     ServiceConfiguration config;
 
     URL adminUrl;
-    PulsarAdmin admin;
+    protected PulsarAdmin admin;
 
     LocalBookkeeperEnsemble bkEnsemble;
 
@@ -83,7 +83,7 @@ public class BrokerBkEnsemblesTests {
     }
 
     @BeforeMethod
-    void setup() throws Exception {
+    protected void setup() throws Exception {
         try {
             // start local bookie and zookeeper
             bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies, 
ZOOKEEPER_PORT, 5001);
@@ -118,7 +118,7 @@ public class BrokerBkEnsemblesTests {
     }
 
     @AfterMethod
-    void shutdown() throws Exception {
+    protected void shutdown() throws Exception {
         try {
             admin.close();
             pulsar.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
index 215d350..7499908 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
@@ -52,7 +52,7 @@ public class RackAwareTest extends BrokerBkEnsemblesTests {
 
     @SuppressWarnings("deprecation")
     @BeforeClass
-    void setup() throws Exception {
+    protected void setup() throws Exception {
         super.setup();
 
         // Start bookies with specific racks
@@ -83,7 +83,7 @@ public class RackAwareTest extends BrokerBkEnsemblesTests {
     }
 
     @AfterClass
-    void shutdown() throws Exception {
+    protected void shutdown() throws Exception {
         super.shutdown();
 
         for (BookieServer bs : bookies) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
new file mode 100644
index 0000000..21a65a6
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.Collections;
+
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.pulsar.broker.ManagedLedgerClientFactory;
+import org.apache.pulsar.broker.service.BrokerBkEnsemblesTests;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SequenceIdWithErrorTest extends BrokerBkEnsemblesTests {
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    @AfterMethod
+    public void cleanup() throws Exception {
+        super.shutdown();
+    }
+
+    /**
+     * Test that sequence id from a producer is correct when there are send 
errors
+     */
+    @Test
+    public void testCheckSequenceId() throws Exception {
+        admin.namespaces().createNamespace("prop/my-test", 
Collections.singleton("usc"));
+
+        String topicName = "prop/my-test/my-topic";
+        int N = 10;
+
+        PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://localhost:" + 
BROKER_SERVICE_PORT).build();
+
+        // Create consumer
+        Consumer<String> consumer = 
client.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub")
+                .subscribe();
+
+        // Fence the topic by opening the ManagedLedger for the topic outside 
the Pulsar broker. This will cause the
+        // broker to fail subsequent send operation and it will trigger a 
recover
+        ManagedLedgerClientFactory clientFactory = new 
ManagedLedgerClientFactory(pulsar.getConfiguration(),
+                pulsar.getZkClient(), pulsar.getBookKeeperClientFactory());
+        ManagedLedgerFactory mlFactory = 
clientFactory.getManagedLedgerFactory();
+        ManagedLedger ml = 
mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
+        ml.close();
+        clientFactory.close();
+
+        // Create a producer
+        Producer<String> producer = 
client.newProducer(Schema.STRING).topic(topicName).create();
+
+        for (int i = 0; i < N; i++) {
+            producer.send("Hello-" + i);
+        }
+
+        for (int i = 0; i < N; i++) {
+            Message<String> msg = consumer.receive();
+            assertEquals(msg.getValue(), "Hello-" + i);
+            assertEquals(msg.getSequenceId(), i);
+            consumer.acknowledge(msg);
+        }
+
+        client.close();
+    }
+
+    @Override
+    public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
+        // Ignore test
+    }
+
+    @Override
+    public void testSkipCorruptDataLedger() throws Exception {
+        // Ignore test
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b5962ac..5d1ac41 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -873,7 +873,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                             this.producerName = producerName;
                         }
 
-                        if (this.lastSequenceIdPublished == -1 && 
conf.getInitialSequenceId() == null) {
+                        if (this.msgIdGenerator == 0 && 
conf.getInitialSequenceId() == null) {
+                            // Only update sequence id generator if it wasn't 
already modified. That means we only want
+                            // to update the id generator the first time the 
producer gets established, and ignore the
+                            // sequence id sent by broker in subsequent 
producer reconnects
                             this.lastSequenceIdPublished = lastSequenceId;
                             this.msgIdGenerator = lastSequenceId + 1;
                         }

Reply via email to