stevenwangnarvar opened a new issue #4589: In Go client 2.3.2, can't get 
re-delivered message after add AckTimeout in ConsumerOptions
URL: https://github.com/apache/pulsar/issues/4589
 
 
   **Describe the bug**
   In Go client, after I set ack timeout in the consumer side; when I send 
messages and didn't ack them, I can't get the re-delivered messages
   
   **To Reproduce**
   ```
   package main
   
   import (
        "context"
        "fmt"
        "log"
        "time"
   
        "github.com/apache/pulsar/pulsar-client-go/pulsar"
   )
   
   func main() {
        topic := "persistent://public/default/some_partition_topic"
   
        // Instantiate a Pulsar client
        client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL: "pulsar://localhost:6650",
        })
   
        if err != nil {
                log.Fatal(err)
        }
   
        // Use the client to instantiate a producer
        producer, err := client.CreateProducer(pulsar.ProducerOptions{
                Topic: topic,
        })
   
        if err != nil {
                log.Fatal(err)
        }
   
        // Use the client object to instantiate a consumer
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
                Topic:            topic,
                SubscriptionName: "subName",
                Type:             pulsar.Shared,
                AckTimeout:       60 * time.Second,
        })
   
        ctx := context.Background()
        // Send 10 messages synchronously and 10 messages asynchronously
        for i := 0; i < 5; i++ {
                // Create a message
                msg := pulsar.ProducerMessage{
                        Payload: []byte(fmt.Sprintf("message-%d", i)),
                }
                // Attempt to send the message
                if err := producer.Send(ctx, msg); err != nil {
                        log.Fatal(err)
                }
        }
   
        // Listen indefinitely on the topic
        for {
                msg, err := consumer.Receive(ctx)
                if err != nil {
                        log.Fatal(err)
                }
                // Do something with the message
                fmt.Println("" + time.Now().Format(time.RFC850) + " consume " + 
string(msg.Payload()))
                //consumer.Ack(msg)
        }
   }
   
   ```
   
   **Expected behavior**
   I can get re-delivered message.
   
   **Screenshots**
   ```
   2019/06/25 13:40:55.168 c_client.go:68: [info] INFO  | 
PartitionedConsumerImpl:246 | Successfully Subscribed to Partitioned Topic - 
persistent://dev/genericfileproc/npulsar_test_input_topic with - 4 Partitions.
   Tuesday, 25-Jun-19 13:40:55 NZST consume message-0
   Tuesday, 25-Jun-19 13:40:55 NZST consume message-1
   Tuesday, 25-Jun-19 13:40:55 NZST consume message-2
   Tuesday, 25-Jun-19 13:40:55 NZST consume message-3
   Tuesday, 25-Jun-19 13:40:55 NZST consume message-4
   ```
   
   **Desktop (please complete the following information):**
    - OS: Linux
   
   **Additional context**
   In Java Client I can get re-delivered message.
   ```
   /Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/bin/java 
"-javaagent:/Applications/IntelliJ 
IDEA.app/Contents/lib/idea_rt.jar=52644:/Applications/IntelliJ 
IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath 
/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_212.jdk/Contents/Home/lib/tools.jar:/Users/stevenwang/Projects/Java/target/classes:/Users/stevenwang/.m2/repository/org/apache/pulsar/pulsar-client/2.3.2/pulsar-client-2.3.2.jar:/Users/stevenwang/.m2/repository/org/apache/pulsar/pulsar-client-api/2.3.2/pulsar-client-api-2.3.2.jar:/Users/stevenwang/.m2/repository/org/apache/pulsar/protobuf-shaded/2.1.0-incubating/protobuf-shaded-2.1.0-incubating.jar:/Users/stevenwang/.m2/repository/org/lz4/lz4-java/1.5.0/lz4-java-1.5.0.jar:/Users/stevenwang/.m2/repository/com/github/luben/zstd-jni/1.3.7-3/zstd-jni-1.3.7-3.jar:/Users/stevenwang/.m2/repository/org/bouncycastle/bcpkix-jdk15on/1.60/bcpkix-jdk15on-1.60.jar:/Users/stevenwang/.m2/repository/org/bouncycastle/bcprov-jdk15on/1.60/bcprov-jdk15on-1.60.jar:/Users/stevenwang/.m2/repository/com/sun/activation/javax.activation/1.2.0/javax.activation-1.2.0.jar:/Users/stevenwang/.m2/repository/org/slf4j/slf4j-api/1.7.25/slf4j-api-1.7.25.jar:/Users/stevenwang/.m2/repository/javax/validation/validation-api/1.1.0.Final/validation-api-1.1.0.Final.jar
 UnackedTrackerIssue
   SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
   SLF4J: Defaulting to no-operation (NOP) logger implementation
   SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
   2019-06-25T01:42:31.590Z consume java 0
   2019-06-25T01:42:32.596Z consume java 1
   2019-06-25T01:42:33.608Z consume java 2
   2019-06-25T01:42:34.616Z consume java 3
   2019-06-25T01:42:35.628Z consume java 4
   2019-06-25T01:43:12.076Z consume java 0
   2019-06-25T01:43:13.076Z consume java 1
   2019-06-25T01:43:14.076Z consume java 2
   2019-06-25T01:43:15.082Z consume java 3
   2019-06-25T01:43:16.079Z consume java 4
   2019-06-25T01:43:53.122Z consume java 0
   2019-06-25T01:43:54.122Z consume java 1
   2019-06-25T01:43:55.123Z consume java 2
   2019-06-25T01:43:56.125Z consume java 3
   2019-06-25T01:43:57.126Z consume java 4
   2019-06-25T01:44:34.172Z consume java 0
   2019-06-25T01:44:35.173Z consume java 1
   2019-06-25T01:44:36.175Z consume java 2
   2019-06-25T01:44:37.176Z consume java 3
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to