This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a2838d8 fix: fix the running task lost lock and visit map (#3105)
8a2838d8 is described below

commit 8a2838d84376ff709ae4a06e63cc4c5678ebec8b
Author: mappjzc <[email protected]>
AuthorDate: Mon Sep 19 12:06:15 2022 +0800

    fix: fix the running task lost lock and visit map (#3105)
    
    Add mu.Lock() and defer mu.Unlock() before visit.
    
    Nddtfjiang <[email protected]>
---
 services/pipeline.go | 11 +++++++----
 services/task.go     |  6 +++++-
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/services/pipeline.go b/services/pipeline.go
index 42516888..a4bced5b 100644
--- a/services/pipeline.go
+++ b/services/pipeline.go
@@ -20,6 +20,11 @@ package services
 import (
        "context"
        "fmt"
+       "os"
+       "path/filepath"
+       "strings"
+       "time"
+
        "github.com/apache/incubator-devlake/errors"
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/models"
@@ -29,10 +34,6 @@ import (
        "go.temporal.io/sdk/client"
        "go.temporal.io/sdk/converter"
        "golang.org/x/sync/semaphore"
-       "os"
-       "path/filepath"
-       "strings"
-       "time"
 )
 
 var notificationService *NotificationService
@@ -198,6 +199,8 @@ func watchTemporalPipelines() {
                        if err != nil {
                                panic(err)
                        }
+                       // progressDetails will be only used in this goroutine 
now
+                       // So it needn't lock and unlock now
                        progressDetails := 
make(map[uint64]*models.TaskProgressDetail)
                        // check their status against temporal
                        for _, rp := range runningDbPipelines {
diff --git a/services/task.go b/services/task.go
index cec147a3..232db101 100644
--- a/services/task.go
+++ b/services/task.go
@@ -22,11 +22,12 @@ import (
        "encoding/json"
        goerror "errors"
        "fmt"
-       "github.com/apache/incubator-devlake/errors"
        "regexp"
        "strconv"
        "sync"
 
+       "github.com/apache/incubator-devlake/errors"
+
        "github.com/apache/incubator-devlake/logger"
        "github.com/apache/incubator-devlake/models"
        "github.com/apache/incubator-devlake/plugins/core"
@@ -279,6 +280,9 @@ func runTaskStandalone(parentLog core.Logger, taskId 
uint64) errors.Error {
 }
 
 func updateTaskProgress(taskId uint64, progress chan core.RunningProgress) {
+       runningTasks.mu.Lock()
+       defer runningTasks.mu.Unlock()
+
        data := runningTasks.tasks[taskId]
        if data == nil {
                return

Reply via email to