This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5b44d3637c8 [Playground] derf Stop tickers to avoid leaks. (#26507)
5b44d3637c8 is described below
commit 5b44d3637c82d8844515e701fa14a065d7849015
Author: Robert Burke <[email protected]>
AuthorDate: Wed May 3 16:39:43 2023 -0700
[Playground] derf Stop tickers to avoid leaks. (#26507)
Co-authored-by: lostluck <[email protected]>
---
playground/backend/internal/cache/local/local_cache.go | 1 +
playground/backend/internal/code_processing/code_processing.go | 6 +++---
playground/backend/internal/db/datastore/emulator_wrapper.go | 2 ++
playground/backend/internal/emulators/kafka.go | 7 ++++---
4 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/playground/backend/internal/cache/local/local_cache.go
b/playground/backend/internal/cache/local/local_cache.go
index 72568540710..881b6b523d2 100644
--- a/playground/backend/internal/cache/local/local_cache.go
+++ b/playground/backend/internal/cache/local/local_cache.go
@@ -166,6 +166,7 @@ func (lc *Cache) GetSdkCatalog(_ context.Context)
([]*entity.SDKEntity, error) {
func (lc *Cache) startGC(ctx context.Context) {
ticker := time.NewTicker(lc.cleanupInterval)
+ defer ticker.Stop()
for {
select {
case <-ctx.Done():
diff --git a/playground/backend/internal/code_processing/code_processing.go
b/playground/backend/internal/code_processing/code_processing.go
index be4c88709ba..d371860430e 100644
--- a/playground/backend/internal/code_processing/code_processing.go
+++ b/playground/backend/internal/code_processing/code_processing.go
@@ -434,10 +434,10 @@ func reconcileBackgroundTask(pipelineLifeCycleCtx
context.Context, pipelineId uu
// If cancel flag exists, and it is true it means that the code processing was
canceled. Set true to cancelChannel and return.
func cancelCheck(ctx context.Context, pipelineId uuid.UUID, cancelFunc
context.CancelFunc, cacheService cache.Cache) {
ticker := time.NewTicker(pauseDuration)
+ defer ticker.Stop()
for {
select {
case <-ctx.Done():
- ticker.Stop()
return
case <-ticker.C:
// Use background context for the cache operation to
avoid failure when the main context is timed out.
@@ -460,12 +460,12 @@ func cancelCheck(ctx context.Context, pipelineId
uuid.UUID, cancelFunc context.C
// In other case each pauseDuration checks that graph file exists or not and
try to save it to the cache.
func readGraphFile(pipelineLifeCycleCtx context.Context, cacheService
cache.Cache, graphFilePath string, pipelineId uuid.UUID) {
ticker := time.NewTicker(pauseDuration)
+ defer ticker.Stop()
for {
select {
// waiting when graph file appears
case <-ticker.C:
if _, err := os.Stat(graphFilePath); err == nil {
- ticker.Stop()
graph, err := os.ReadFile(graphFilePath)
if err != nil {
logger.Errorf("%s: Error during saving
graph to the file: %s", pipelineId, err.Error())
@@ -474,7 +474,6 @@ func readGraphFile(pipelineLifeCycleCtx context.Context,
cacheService cache.Cach
}
// in case of timeout or cancel
case <-pipelineLifeCycleCtx.Done():
- ticker.Stop()
if _, err := os.Stat(graphFilePath); err == nil {
graph, err := os.ReadFile(graphFilePath)
if err != nil {
@@ -497,6 +496,7 @@ func readGraphFile(pipelineLifeCycleCtx context.Context,
cacheService cache.Cach
// In other case each pauseDuration write to cache logs of the code processing.
func readLogFile(pipelineLifeCycleCtx context.Context, cacheService
cache.Cache, logFilePath string, pipelineId uuid.UUID, stopReadLogsChannel,
finishReadLogChannel chan bool) {
ticker := time.NewTicker(pauseDuration)
+ defer ticker.Stop()
for {
select {
// in case of timeout or cancel
diff --git a/playground/backend/internal/db/datastore/emulator_wrapper.go
b/playground/backend/internal/db/datastore/emulator_wrapper.go
index 9252c454b01..f1cf0206b66 100644
--- a/playground/backend/internal/db/datastore/emulator_wrapper.go
+++ b/playground/backend/internal/db/datastore/emulator_wrapper.go
@@ -130,7 +130,9 @@ func startEmulator() (*emulator, error) {
ok := func() bool {
workTicker := time.NewTicker(pauseDuration)
+ defer workTicker.Stop()
globalTicker := time.NewTicker(waitDuration)
+ defer workTicker.Stop()
for {
select {
case <-workTicker.C:
diff --git a/playground/backend/internal/emulators/kafka.go
b/playground/backend/internal/emulators/kafka.go
index 58112439c67..031373b47b0 100644
--- a/playground/backend/internal/emulators/kafka.go
+++ b/playground/backend/internal/emulators/kafka.go
@@ -16,7 +16,6 @@
package emulators
import (
- "beam.apache.org/playground/backend/internal/logger"
"bufio"
"encoding/json"
"errors"
@@ -28,10 +27,10 @@ import (
"strings"
"time"
+ "beam.apache.org/playground/backend/internal/constants"
+ "beam.apache.org/playground/backend/internal/logger"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/linkedin/goavro"
-
- "beam.apache.org/playground/backend/internal/constants"
)
const (
@@ -109,7 +108,9 @@ func NewKafkaMockCluster(emulatorExecutablePath string)
(*KafkaMockCluster, erro
bootstrapServers := fmt.Sprintf("%s%s%s", host, addressSeperator, port)
workTicker := time.NewTicker(pauseDuration)
+ defer workTicker.Stop()
globalTicker := time.NewTicker(globalDuration)
+ defer globalTicker.Stop()
for {
select {
case <-workTicker.C: