This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new dfb2d5e54ff [fix][fn]Reset idle timer correctly (#20450)
dfb2d5e54ff is described below
commit dfb2d5e54fff606bcfa4b0291657341b33bd4d44
Author: Andy Walker <[email protected]>
AuthorDate: Thu Jun 1 13:42:04 2023 -0400
[fix][fn]Reset idle timer correctly (#20450)
Co-authored-by: Andy Walker <[email protected]>
Fix apache/pulsar#20449
<!-- or this PR is one task of an issue -->
Master Issue: #20449
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change is a trivial rework / code cleanup without any test coverage.
### Does this pull request potentially affect one of the following parts:
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
*If the box was checked, please highlight the changes*
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [X] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
### Matching PR in forked repository
PR in forked repository: https://github.com/flowchartsman/pulsar/pull/5
(cherry picked from commit e05b890b804cee781e87db750f2afc3a382fa1d2)
---
pulsar-function-go/pf/instance.go | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/pulsar-function-go/pf/instance.go
b/pulsar-function-go/pf/instance.go
index def254b46bb..c5b7500803d 100644
--- a/pulsar-function-go/pf/instance.go
+++ b/pulsar-function-go/pf/instance.go
@@ -149,7 +149,6 @@ func (gi *goInstance) startFunction(function function)
error {
defer metricsServicer.close()
CLOSE:
for {
- idleTimer.Reset(idleDuration)
select {
case cm := <-channel:
msgInput := cm.Message
@@ -182,6 +181,11 @@ CLOSE:
close(channel)
break CLOSE
}
+ // reset the idle timer and drain if appropriate before the
next loop
+ if !idleTimer.Stop() {
+ <-idleTimer.C
+ }
+ idleTimer.Reset(idleDuration)
}
gi.closeLogTopic()