sijie closed pull request #1818: Fixed race condition in consumer event listener
URL: https://github.com/apache/incubator-pulsar/pull/1818
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 7050f1dd8b..9ab7b877c0 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -38,7 +38,6 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
-import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -199,6 +198,7 @@ public synchronized void internalReadEntriesComplete(final
List<Entry> entries,
entries.forEach(Entry::release);
cursor.rewind();
if (currentConsumer != null) {
+ notifyActiveConsumerChanged(currentConsumer);
readMoreEntries(currentConsumer);
}
} else {
@@ -458,6 +458,9 @@ private synchronized void
internalReadEntriesFailed(ManagedLedgerException excep
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Retrying read operation", name,
c);
}
+ if (currentConsumer != c) {
+ notifyActiveConsumerChanged(currentConsumer);
+ }
readMoreEntries(currentConsumer);
} else {
log.info("[{}-{}] Skipping read retry: Current
Consumer {}, havePendingRead {}", name, c,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 6c61734c2f..3e293afc61 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -24,6 +24,9 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -53,9 +56,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
public class PersistentFailoverE2ETest extends BrokerTestBase {
@BeforeClass
@@ -70,7 +70,7 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}
- private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 2000;
+ private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 100;
private static class TestConsumerStateEventListener implements
ConsumerEventListener {
@@ -100,12 +100,16 @@ private void
verifyConsumerNotReceiveAnyStateChanges(TestConsumerStateEventListe
}
private void verifyConsumerActive(TestConsumerStateEventListener listener,
int partitionId) throws Exception {
- assertEquals(partitionId, listener.activeQueue.take().intValue());
+ Integer pid = listener.activeQueue.poll(10, TimeUnit.SECONDS);
+ assertNotNull(pid);
+ assertEquals(partitionId, pid.intValue());
assertNull(listener.inActiveQueue.poll());
}
private void verifyConsumerInactive(TestConsumerStateEventListener
listener, int partitionId) throws Exception {
- assertEquals(partitionId, listener.inActiveQueue.take().intValue());
+ Integer pid = listener.inActiveQueue.poll(10, TimeUnit.SECONDS);
+ assertNotNull(pid);
+ assertEquals(partitionId, pid.intValue());
assertNull(listener.activeQueue.poll());
}
@@ -141,7 +145,7 @@ public void testSimpleConsumerEventsWithoutPartition()
throws Exception {
// 1. two consumers on the same subscription
ConsumerBuilder<byte[]> consumerBulder1 =
consumerBuilder.clone().consumerName("1")
- .consumerEventListener(listener1).acknowledgmentGroupTime(0,
TimeUnit.SECONDS);
+ .consumerEventListener(listener1);
Consumer<byte[]> consumer1 = consumerBulder1.subscribe();
Consumer<byte[]> consumer2 =
consumerBuilder.clone().consumerName("2").consumerEventListener(listener2)
.subscribe();
@@ -177,7 +181,7 @@ public void testSimpleConsumerEventsWithoutPartition()
throws Exception {
// 3. consumer1 should have all the messages while consumer2 should
have no messages
Message<byte[]> msg = null;
- Assert.assertNull(consumer2.receive(1, TimeUnit.SECONDS));
+ Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
for (int i = 0; i < numMsgs; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
@@ -222,7 +226,7 @@ public void testSimpleConsumerEventsWithoutPartition()
throws Exception {
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer2.acknowledge(msg);
}
- Assert.assertNull(consumer2.receive(1, TimeUnit.SECONDS));
+ Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
rolloverPerIntervalStats();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@@ -250,7 +254,7 @@ public void testSimpleConsumerEventsWithoutPartition()
throws Exception {
Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
consumer1.acknowledge(msg);
}
- Assert.assertNull(consumer1.receive(1, TimeUnit.SECONDS));
+ Assert.assertNull(consumer1.receive(100, TimeUnit.MILLISECONDS));
rolloverPerIntervalStats();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@@ -277,7 +281,7 @@ public void testSimpleConsumerEventsWithoutPartition()
throws Exception {
verifyConsumerInactive(listener3, -1);
- Assert.assertNull(consumer3.receive(1, TimeUnit.SECONDS));
+ Assert.assertNull(consumer3.receive(100, TimeUnit.MILLISECONDS));
for (int i = 5; i < numMsgs; i++) {
msg = consumer1.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
@@ -299,7 +303,6 @@ public void testSimpleConsumerEventsWithoutPartition()
throws Exception {
// 9. unsubscribe allowed if there is a lone consumer
consumer1.close();
- Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
consumer2.close();
Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
try {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services