sijie closed pull request #1818: Fixed race condition in consumer event listener
URL: https://github.com/apache/incubator-pulsar/pull/1818
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 7050f1dd8b..9ab7b877c0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -38,7 +38,6 @@
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
-import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
@@ -199,6 +198,7 @@ public synchronized void internalReadEntriesComplete(final 
List<Entry> entries,
             entries.forEach(Entry::release);
             cursor.rewind();
             if (currentConsumer != null) {
+                notifyActiveConsumerChanged(currentConsumer);
                 readMoreEntries(currentConsumer);
             }
         } else {
@@ -458,6 +458,9 @@ private synchronized void 
internalReadEntriesFailed(ManagedLedgerException excep
                         if (log.isDebugEnabled()) {
                             log.debug("[{}-{}] Retrying read operation", name, 
c);
                         }
+                        if (currentConsumer != c) {
+                            notifyActiveConsumerChanged(currentConsumer);
+                        }
                         readMoreEntries(currentConsumer);
                     } else {
                         log.info("[{}-{}] Skipping read retry: Current 
Consumer {}, havePendingRead {}", name, c,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index 6c61734c2f..3e293afc61 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -24,6 +24,9 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -53,9 +56,6 @@
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 public class PersistentFailoverE2ETest extends BrokerTestBase {
 
     @BeforeClass
@@ -70,7 +70,7 @@ protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
-    private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 2000;
+    private static final int CONSUMER_ADD_OR_REMOVE_WAIT_TIME = 100;
 
     private static class TestConsumerStateEventListener implements 
ConsumerEventListener {
 
@@ -100,12 +100,16 @@ private void 
verifyConsumerNotReceiveAnyStateChanges(TestConsumerStateEventListe
     }
 
     private void verifyConsumerActive(TestConsumerStateEventListener listener, 
int partitionId) throws Exception {
-        assertEquals(partitionId, listener.activeQueue.take().intValue());
+        Integer pid = listener.activeQueue.poll(10, TimeUnit.SECONDS);
+        assertNotNull(pid);
+        assertEquals(partitionId, pid.intValue());
         assertNull(listener.inActiveQueue.poll());
     }
 
     private void verifyConsumerInactive(TestConsumerStateEventListener 
listener, int partitionId) throws Exception {
-        assertEquals(partitionId, listener.inActiveQueue.take().intValue());
+        Integer pid = listener.inActiveQueue.poll(10, TimeUnit.SECONDS);
+        assertNotNull(pid);
+        assertEquals(partitionId, pid.intValue());
         assertNull(listener.activeQueue.poll());
     }
 
@@ -141,7 +145,7 @@ public void testSimpleConsumerEventsWithoutPartition() 
throws Exception {
 
         // 1. two consumers on the same subscription
         ConsumerBuilder<byte[]> consumerBulder1 = 
consumerBuilder.clone().consumerName("1")
-                .consumerEventListener(listener1).acknowledgmentGroupTime(0, 
TimeUnit.SECONDS);
+                .consumerEventListener(listener1);
         Consumer<byte[]> consumer1 = consumerBulder1.subscribe();
         Consumer<byte[]> consumer2 = 
consumerBuilder.clone().consumerName("2").consumerEventListener(listener2)
                 .subscribe();
@@ -177,7 +181,7 @@ public void testSimpleConsumerEventsWithoutPartition() 
throws Exception {
 
         // 3. consumer1 should have all the messages while consumer2 should 
have no messages
         Message<byte[]> msg = null;
-        Assert.assertNull(consumer2.receive(1, TimeUnit.SECONDS));
+        Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
         for (int i = 0; i < numMsgs; i++) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
@@ -222,7 +226,7 @@ public void testSimpleConsumerEventsWithoutPartition() 
throws Exception {
             Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
             consumer2.acknowledge(msg);
         }
-        Assert.assertNull(consumer2.receive(1, TimeUnit.SECONDS));
+        Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
 
         rolloverPerIntervalStats();
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@@ -250,7 +254,7 @@ public void testSimpleConsumerEventsWithoutPartition() 
throws Exception {
             Assert.assertEquals(new String(msg.getData()), "my-message-" + i);
             consumer1.acknowledge(msg);
         }
-        Assert.assertNull(consumer1.receive(1, TimeUnit.SECONDS));
+        Assert.assertNull(consumer1.receive(100, TimeUnit.MILLISECONDS));
 
         rolloverPerIntervalStats();
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
@@ -277,7 +281,7 @@ public void testSimpleConsumerEventsWithoutPartition() 
throws Exception {
 
         verifyConsumerInactive(listener3, -1);
 
-        Assert.assertNull(consumer3.receive(1, TimeUnit.SECONDS));
+        Assert.assertNull(consumer3.receive(100, TimeUnit.MILLISECONDS));
         for (int i = 5; i < numMsgs; i++) {
             msg = consumer1.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
@@ -299,7 +303,6 @@ public void testSimpleConsumerEventsWithoutPartition() 
throws Exception {
 
         // 9. unsubscribe allowed if there is a lone consumer
         consumer1.close();
-        Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
         consumer2.close();
         Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
         try {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to