Lanayx commented on issue #5877: Shared subscription doesn't work well with 
partitioned topic in 2.4.2
URL: https://github.com/apache/pulsar/issues/5877#issuecomment-566601841
 
 
   @codelipenghui I don't think so, I'm on windows, so only able to run pulsar 
as docker or kubernetes image. This is java code that I used to test those 
consumers if it helps (sorry for quality, I'm newbie in Java). 
   ```
   package jtest;
   
   import java.io.IOException;
   import java.time.Duration;
   import java.time.Instant;
   import java.util.ArrayList;
   import java.util.concurrent.CompletableFuture;
   import java.util.concurrent.Future;
   import java.util.concurrent.TimeUnit;
   import java.util.function.Function;
   import java.util.logging.LogManager;
   
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.MessageId;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.PulsarClientException;
   import org.apache.pulsar.client.api.Reader;
   import org.apache.pulsar.client.api.SubscriptionInitialPosition;
   import org.apache.pulsar.client.api.SubscriptionType;
   import org.apache.pulsar.client.impl.TopicMessageIdImpl;
   
   /**
    * Hello world!
    */
   public final class App {
       private static Instant start;
   
       private App() {
       }
   
       public static void run(Consumer consumer, int number) {
   
           // Wait for a message
           CompletableFuture<Message> msgF = consumer.receiveAsync();
           msgF.thenAccept(msg -> {
               // try {
               // Do something with the message
               TopicMessageIdImpl id = (TopicMessageIdImpl) msg.getMessageId();
               System.out.printf("Message received: %s %s %s\n", msg.getKey(), 
new String(msg.getData()), id.getTopicPartitionName());
   
               // Acknowledge the message so that it can be deleted by the 
message broker
               consumer.acknowledgeAsync(msg).thenAccept(action -> {
                   if (number < 5000) {
                       run(consumer, number + 1);
                   } else {
                       Instant end = Instant.now();
                       long time = Duration.between(start, end).toMillis();
                       System.out.println("End! " + time);
                   }
   
               }).exceptionally(x -> {
                   System.out.println("Exception!");
                   return x;
               });
           });
   
       }
   
       /**
        * Says hello to the world.
        *
        * @param args The arguments of the program.
        * @throws IOException
        */
       public static void main(String[] args) throws IOException {
   
           System.out.println("Hello World!");
           try {
               PulsarClient client = 
PulsarClient.builder().serviceUrl("pulsar://my-pulsar-cluster:31002").build();
               Consumer consumer = client.newConsumer()
                   .topic("persistent://public/default/partitioned2")
                   .subscriptionName("test-subscription3")
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
   
               Consumer consumer2 = client.newConsumer()
                   .topic("persistent://public/default/partitioned2")
                   .subscriptionName("test-subscription3")
                   
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                   .subscriptionType(SubscriptionType.Shared)
                   .subscribe();
   
               System.out.println("Consumer created! ");
   
               run(consumer,0);
               run(consumer2,0);
   
                } catch (PulsarClientException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                }
       }
   }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to