flowchartsman opened a new issue, #20449:
URL: https://github.com/apache/pulsar/issues/20449

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   The code in `instance.go` which handles incoming messages and idle timeout 
basically boils down to this:
   
   ```go
        idleDuration := getIdleTimeout(time.Millisecond * 
time.Duration(gi.context.instanceConf.killAfterIdleMs))
        idleTimer := time.NewTimer(idleDuration)
        defer idleTimer.Stop()
   
   CLOSE:
        for {
                idleTimer.Reset(idleDuration)
                select {
                case cm := <-channel:
                        // handle message
                case <-idleTimer.C:
                        close(channel)
                        break CLOSE
                }
        }
   ```
   However, this does not account for the idle timer firing during work. If it 
does, then the reset will not have the desired effect, and the idle clause will 
fire erroneously on a subsequent loop, since there's now a timestamp on the 
channel.
   
   ### Minimal reproduce step
   
   Here's a simple repro:
   ```go
   package main
   
   import "fmt"
   import "time"
   
   func main() {
        const idleDuration = 250*time.Millisecond
        work := make(chan int,2)
        work <- 1
        work <- 2
        t := time.NewTimer(idleDuration)
        EVENTLOOP:
        for {
                t.Reset(idleDuration)
                fmt.Println("waiting for new work")
                select {
                case w := <- work:
                        fmt.Printf("doing work on %d\n", w)
                        time.Sleep(500*time.Millisecond)
                        fmt.Println("done with work")
                case <- t.C:
                        fmt.Println("timed out")
                        break EVENTLOOP
                }
        }
   }
   ```
   
   Run that a few times, and you'll see:
   ```
   waiting for new work
   doing work on 1
   done with work
   waiting for new work
   timed out
   ```
   This is exactly what would happen in a Go function, only it would then call 
`log.Fatal`
   
   ### What did you expect to see?
   
   n/a
   
   ### What did you see instead?
   
   n/a
   
   ### Anything else?
   
   The correct fix is to call `Stop()` on the timer and, if it returns false, 
drain the channel before resetting:
   
   ```go
   CLOSE:
        for {
                select {
                case cm := <-channel:
                        // handle message
                case <-idleTimer.C:
                        close(channel)
                        break CLOSE
                }
                if !idleTimer.Stop() {
                        <-idleTimer.C
                }
        }
   
   ```
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to