This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 28c3d1c [BEAM-13224][Playground] Add using context to local cache to
stop goroutine
new 93e5e61 Merge pull request #15959 from [BEAM-13224][Playground] Local
cache creates an forever used goroutine
28c3d1c is described below
commit 28c3d1c65fd0a2b3d1e8b5cc8c63a7dc71469c1a
Author: AydarZaynutdinov <[email protected]>
AuthorDate: Fri Nov 12 13:36:53 2021 +0300
[BEAM-13224][Playground]
Add using context to local cache to stop goroutine
---
.../backend/internal/cache/local/local_cache.go | 23 +++++++++++++---------
.../internal/cache/local/local_cache_test.go | 8 +++++++-
2 files changed, 21 insertions(+), 10 deletions(-)
diff --git a/playground/backend/internal/cache/local/local_cache.go
b/playground/backend/internal/cache/local/local_cache.go
index fb78bba..6598d09 100644
--- a/playground/backend/internal/cache/local/local_cache.go
+++ b/playground/backend/internal/cache/local/local_cache.go
@@ -44,7 +44,7 @@ func New(ctx context.Context) *Cache {
pipelinesExpiration: pipelinesExpiration,
}
- go ls.startGC()
+ go ls.startGC(ctx)
return ls
}
@@ -99,16 +99,21 @@ func (lc *Cache) SetExpTime(ctx context.Context, pipelineId
uuid.UUID, expTime t
return nil
}
-func (lc *Cache) startGC() {
+func (lc *Cache) startGC(ctx context.Context) {
+ ticker := time.NewTicker(lc.cleanupInterval)
for {
- <-time.After(lc.cleanupInterval)
-
- if lc.items == nil {
+ select {
+ case <-ctx.Done():
+ ticker.Stop()
return
- }
-
- if pipelines := lc.expiredPipelines(); len(pipelines) != 0 {
- lc.clearItems(pipelines)
+ case <-ticker.C:
+ if lc.items == nil {
+ return
+ }
+
+ if pipelines := lc.expiredPipelines(); len(pipelines)
!= 0 {
+ lc.clearItems(pipelines)
+ }
}
}
}
diff --git a/playground/backend/internal/cache/local/local_cache_test.go
b/playground/backend/internal/cache/local/local_cache_test.go
index 4964dbe..8bdb45d 100644
--- a/playground/backend/internal/cache/local/local_cache_test.go
+++ b/playground/backend/internal/cache/local/local_cache_test.go
@@ -19,6 +19,7 @@ import (
"beam.apache.org/playground/backend/internal/cache"
"context"
"github.com/google/uuid"
+ "go.uber.org/goleak"
"reflect"
"testing"
"time"
@@ -228,6 +229,11 @@ func TestLocalCache_SetExpTime(t *testing.T) {
}
func TestLocalCache_startGC(t *testing.T) {
+ defer goleak.VerifyNone(t)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
preparedId, _ := uuid.NewUUID()
preparedItemsMap := make(map[uuid.UUID]map[cache.SubKey]interface{})
preparedItemsMap[preparedId] = make(map[cache.SubKey]interface{})
@@ -260,7 +266,7 @@ func TestLocalCache_startGC(t *testing.T) {
items: tt.fields.items,
pipelinesExpiration:
tt.fields.pipelinesExpiration,
}
- go lc.startGC()
+ go lc.startGC(ctx)
time.Sleep(time.Millisecond)
if len(tt.fields.items) != 0 {
t.Errorf("Pipeline: %s not deleted in time.",
preparedId)