k-pisey opened a new issue #14086:
URL: https://github.com/apache/pulsar/issues/14086


   **Describe the bug**
   I use an internal consumer received at consumer event listener 
`becameActive(Consumer<?> consumer, int i) ` to consume message from the 
partition i.
   I got number of received messages that is very unidentical to number of 
produced messages.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   I leave my test program below.
   
   Test steps:
   1. Start pulsar container
   2. Create partitioned topic with number of partitions equal to 1
   3. Subscribe to partitioned topic with failover subscription type and 
consumer event listener
   4. Wait sufficient time until pulsar broker notified consumer event listener 
about an internal consumer became active on partition 0
   5. Start receiving message via the given consumer on another thread, it will 
be forever running
   6. Create a partitioned producer then produces messages to topic
   
   **Expected behavior**
   After received the last input message, number of received messages should 
identical to number of produced messages
   
   **Desktop (please complete the following information):**
    - OS: Mac OS
    - Java testcontainers
    - Intellij
   
   **Additional context**
   - pulsar image version: 2.9.1
   - pulsar-client version: 2.9.1
   - pulsar-client-admin version: 2.9.1
   - testcontainers version: 1.16.2
   
   ```java
   import lombok.extern.slf4j.Slf4j;
   import org.apache.pulsar.client.admin.PulsarAdmin;
   import org.apache.pulsar.client.admin.PulsarAdminException;
   import org.apache.pulsar.client.api.*;
   import org.junit.jupiter.api.Test;
   import org.testcontainers.containers.PulsarContainer;
   import org.testcontainers.utility.DockerImageName;
   
   import java.util.HashMap;
   import java.util.Map;
   import java.util.concurrent.TimeUnit;
   import java.util.concurrent.atomic.AtomicBoolean;
   
   @Slf4j
   public class InternalConsumerTest {
       final static String TOPIC = "my-topic";
   
       static void setup(String restUrl) throws PulsarClientException, 
PulsarAdminException {
           log.info("Start setup");
           try (PulsarAdmin admin = PulsarAdmin.builder()
                   .serviceHttpUrl(restUrl)
                   .build();
           ) {
               admin.topics().createPartitionedTopic(TOPIC, 1);
           }
           log.info("Setup finished");
       }
   
       static void subscribeToTopic(PulsarClient client, EventListener 
eventListener, String topic, String subscription) {
           try {
               client.newConsumer(Schema.STRING)
                       .topic(topic)
                       .subscriptionName(subscription)
                       .consumerEventListener(eventListener)
                       .subscriptionType(SubscriptionType.Failover)
                       .subscribe();
           } catch (PulsarClientException e) {
               log.error("Failed to instantiate consumer on topic: {}", TOPIC, 
e);
           }
       }
   
       @Test
       void test() throws InterruptedException, PulsarAdminException, 
PulsarClientException {
           DockerImageName PULSAR_IMAGE = 
DockerImageName.parse("apachepulsar/pulsar:2.9.1");
           try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)
           ) {
               pulsar.start();
               Thread.sleep(TimeUnit.SECONDS.toMillis(2));
               setup(pulsar.getHttpServiceUrl());
               try (PulsarClient client = PulsarClient.builder()
                       .serviceUrl(pulsar.getPulsarBrokerUrl())
                       .build()) {
                   Manager manager = new Manager();
                   EventListener eventListener = new EventListener(manager);
                   subscribeToTopic(client, eventListener, TOPIC, "sub-1");
                   while (!manager.consumingStarted.get()) {
                       Thread.sleep(TimeUnit.SECONDS.toMillis(1));
                   }
                   // sleep another 1 sec after internal consumer started 
consuming
                   Thread.sleep(TimeUnit.SECONDS.toMillis(1));
                   // create partitioned producer then produce messages to topic
                   Runnable producerTask = new Thread(producerTask(client));
                   producerTask.run();
                   // forever sleep
                   while (true) {
                       Thread.sleep(TimeUnit.MINUTES.toMillis(1));
                   }
               }
           }
       }
   
       static Runnable producerTask(PulsarClient client) {
           return () -> {
               try (Producer<String> producer = 
client.newProducer(Schema.STRING)
                       .topic(TOPIC)
                       
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
                       .create()) {
                   log.info("Start producing");
                   String[] keys = {"1111", "2222", "3333"};
                   int k = 0;
                   for (int i = 0; i < 90; i++) {
                       if (k == 3) {
                           k = 0;
                       }
                       producer.newMessage()
                               .key(keys[k])
                               .value(String.valueOf(i))
                               .send();
                       k++;
                   }
                   
                 producer.newMessage()
                         .key(keys[0])
                         .value(String.valueOf(-1))
                         .send();
                   log.info("Producing finished");
               } catch (PulsarClientException e) {
                   log.info("Error", e);
               }
           };
       }
   
       static Runnable consumerTask(Consumer<?> consumer, AtomicBoolean 
consumingStarted) {
           return () -> {
               String s = "";
               int n = 0;
               log.info("Start receiving message from topic {}", 
consumer.getTopic());
               consumingStarted.set(true);
               while (!s.equals("-1")) { // if message not equal to last input 
message
                   try {
                       Message<?> msg = consumer.receive(1, TimeUnit.MINUTES);
                       if (msg != null) {
                           s = new String(msg.getData());
                           n++;
                           log.info("Received message: {}", s);
                           consumer.acknowledge(msg);
                       }
                   } catch (PulsarClientException e) {
                       log.error("Error", e);
                   }
               }
               log.info("Number of received messages: {}", n);
           };
       }
   
       static class EventListener implements ConsumerEventListener {
           private final Manager manager;
   
           EventListener(Manager manager) {
               this.manager = manager;
           }
   
           @Override
           public void becameActive(Consumer<?> consumer, int i) {
               log.info("Consumer {name: {}, hashcode: {}} became active on 
partition {}", consumer.getConsumerName(), consumer.hashCode(), i);
               manager.run(consumer, i);
           }
   
           @Override
           public void becameInactive(Consumer<?> consumer, int i) {
               log.info("Consumer {name: {}, hashcode: {}} became inactive on 
partition {}", consumer.getConsumerName(), consumer.hashCode(), i);
           }
       }
   
       static class Manager {
           Map<Integer, Thread> runningMap = new HashMap<>();
           AtomicBoolean consumingStarted = new AtomicBoolean(false);
   
           void run(Consumer<?> consumer, int i) {
               if (runningMap.containsKey(i)) {
                   return;
               }
               Thread thread = new Thread(consumerTask(consumer, 
consumingStarted));
               thread.start();
               runningMap.put(i, thread);
           }
       }
   }
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to