This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f1df048 Fixed resetting of producer sequence id counter after
receiving SendError before any message was successfully published (#2047)
f1df048 is described below
commit f1df0484d1f094d9bd48db18d28e1a1ef233f6db
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 28 15:21:25 2018 -0700
Fixed resetting of producer sequence id counter after receiving SendError
before any message was successfully published (#2047)
---
.../broker/service/BrokerBkEnsemblesTests.java | 8 +-
.../pulsar/broker/service/RackAwareTest.java | 4 +-
.../client/impl/SequenceIdWithErrorTest.java | 102 +++++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 5 +-
4 files changed, 112 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 173fb49..4ecdae4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -60,11 +60,11 @@ import com.google.common.collect.Sets;
*/
public class BrokerBkEnsemblesTests {
protected static int BROKER_SERVICE_PORT = 16650;
- PulsarService pulsar;
+ protected PulsarService pulsar;
ServiceConfiguration config;
URL adminUrl;
- PulsarAdmin admin;
+ protected PulsarAdmin admin;
LocalBookkeeperEnsemble bkEnsemble;
@@ -83,7 +83,7 @@ public class BrokerBkEnsemblesTests {
}
@BeforeMethod
- void setup() throws Exception {
+ protected void setup() throws Exception {
try {
// start local bookie and zookeeper
bkEnsemble = new LocalBookkeeperEnsemble(numberOfBookies,
ZOOKEEPER_PORT, 5001);
@@ -118,7 +118,7 @@ public class BrokerBkEnsemblesTests {
}
@AfterMethod
- void shutdown() throws Exception {
+ protected void shutdown() throws Exception {
try {
admin.close();
pulsar.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
index 215d350..7499908 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
@@ -52,7 +52,7 @@ public class RackAwareTest extends BrokerBkEnsemblesTests {
@SuppressWarnings("deprecation")
@BeforeClass
- void setup() throws Exception {
+ protected void setup() throws Exception {
super.setup();
// Start bookies with specific racks
@@ -83,7 +83,7 @@ public class RackAwareTest extends BrokerBkEnsemblesTests {
}
@AfterClass
- void shutdown() throws Exception {
+ protected void shutdown() throws Exception {
super.shutdown();
for (BookieServer bs : bookies) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
new file mode 100644
index 0000000..21a65a6
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.Collections;
+
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.pulsar.broker.ManagedLedgerClientFactory;
+import org.apache.pulsar.broker.service.BrokerBkEnsemblesTests;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SequenceIdWithErrorTest extends BrokerBkEnsemblesTests {
+ @Override
+ @BeforeMethod
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @AfterMethod
+ public void cleanup() throws Exception {
+ super.shutdown();
+ }
+
+ /**
+ * Test that sequence id from a producer is correct when there are send
errors
+ */
+ @Test
+ public void testCheckSequenceId() throws Exception {
+ admin.namespaces().createNamespace("prop/my-test",
Collections.singleton("usc"));
+
+ String topicName = "prop/my-test/my-topic";
+ int N = 10;
+
+ PulsarClient client =
PulsarClient.builder().serviceUrl("pulsar://localhost:" +
BROKER_SERVICE_PORT).build();
+
+ // Create consumer
+ Consumer<String> consumer =
client.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub")
+ .subscribe();
+
+ // Fence the topic by opening the ManagedLedger for the topic outside
the Pulsar broker. This will cause the
+ // broker to fail subsequent send operation and it will trigger a
recover
+ ManagedLedgerClientFactory clientFactory = new
ManagedLedgerClientFactory(pulsar.getConfiguration(),
+ pulsar.getZkClient(), pulsar.getBookKeeperClientFactory());
+ ManagedLedgerFactory mlFactory =
clientFactory.getManagedLedgerFactory();
+ ManagedLedger ml =
mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
+ ml.close();
+ clientFactory.close();
+
+ // Create a producer
+ Producer<String> producer =
client.newProducer(Schema.STRING).topic(topicName).create();
+
+ for (int i = 0; i < N; i++) {
+ producer.send("Hello-" + i);
+ }
+
+ for (int i = 0; i < N; i++) {
+ Message<String> msg = consumer.receive();
+ assertEquals(msg.getValue(), "Hello-" + i);
+ assertEquals(msg.getSequenceId(), i);
+ consumer.acknowledge(msg);
+ }
+
+ client.close();
+ }
+
+ @Override
+ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception {
+ // Ignore test
+ }
+
+ @Override
+ public void testSkipCorruptDataLedger() throws Exception {
+ // Ignore test
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index b5962ac..5d1ac41 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -873,7 +873,10 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
this.producerName = producerName;
}
- if (this.lastSequenceIdPublished == -1 &&
conf.getInitialSequenceId() == null) {
+ if (this.msgIdGenerator == 0 &&
conf.getInitialSequenceId() == null) {
+ // Only update sequence id generator if it wasn't
already modified. That means we only want
+ // to update the id generator the first time the
producer gets established, and ignore the
+ // sequence id sent by broker in subsequent
producer reconnects
this.lastSequenceIdPublished = lastSequenceId;
this.msgIdGenerator = lastSequenceId + 1;
}