This is an automated email from the ASF dual-hosted git repository.

ka94 pushed a commit to branch issues/5939
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/issues/5939 by this push:
     new 6d926bf33 refactor: removed temporal support
6d926bf33 is described below

commit 6d926bf33d975f73e0eaab471c864b1702739e19
Author: Keon Amini <[email protected]>
AuthorDate: Thu Sep 28 16:52:36 2023 -0500

    refactor: removed temporal support
---
 Makefile                                   |   8 +-
 backend/Makefile                           |  12 +--
 backend/core/config/config_viper.go        |   1 -
 backend/go.mod                             |   2 -
 backend/server/services/pipeline.go        | 164 +++++------------------------
 backend/server/services/pipeline_runner.go |  40 +------
 backend/worker/app/logger.go               |  66 ------------
 backend/worker/app/pipeline_workflow.go    |  82 ---------------
 backend/worker/app/shared.go               |  63 -----------
 backend/worker/app/task_activity.go        |  53 ----------
 backend/worker/main.go                     |  58 ----------
 11 files changed, 27 insertions(+), 522 deletions(-)

diff --git a/Makefile b/Makefile
index 1ce6c7027..166b1ae17 100644
--- a/Makefile
+++ b/Makefile
@@ -71,15 +71,12 @@ swag:
 build-plugin:
        make build-plugin -C backend
 
-build-worker:
-       make build-worker -C backend
-
 build-server:
        make build-server -C backend
 
 build: build-plugin build-server
 
-all: build build-worker
+all: build
 
 tap-models:
        make tap-models -C backend
@@ -87,9 +84,6 @@ tap-models:
 run:
        make run -C backend
 
-worker:
-       make worker -C backend
-
 dev:
        make dev -C backend
 
diff --git a/backend/Makefile b/backend/Makefile
index b86c95348..48dc38725 100644
--- a/backend/Makefile
+++ b/backend/Makefile
@@ -57,13 +57,6 @@ build-plugin:
                PLUGIN=$(PLUGIN) sh scripts/compile-plugins.sh; \
        fi
 
-build-worker:
-       if [ "$(DEBUG)" = "true" ]; then \
-               go build -gcflags='all=-N -l' -ldflags "-X 
'github.com/apache/incubator-devlake/core/version.Version=$(VERSION)'" -o 
bin/lake-worker ./worker/; \
-       else \
-               go build -ldflags "-X 
'github.com/apache/incubator-devlake/core/version.Version=$(VERSION)'" -o 
bin/lake-worker ./worker/; \
-       fi
-
 build-server: swag
        if [ "$(DEBUG)" = "true" ]; then \
                go build -gcflags='all=-N -l' -ldflags "-X 
'github.com/apache/incubator-devlake/core/version.Version=$(VERSION)'" -o 
bin/lake ./server/; \
@@ -78,14 +71,11 @@ build-python: #don't mix this with the other build commands
 
 build: build-plugin build-server
 
-all: build build-worker
+all: build
 
 run:
        go run server/main.go
 
-worker:
-       go run worker/*.go
-
 dev: build-plugin build-python run
 
 godev:
diff --git a/backend/core/config/config_viper.go 
b/backend/core/config/config_viper.go
index 80d1dd85d..0937f2237 100644
--- a/backend/core/config/config_viper.go
+++ b/backend/core/config/config_viper.go
@@ -102,7 +102,6 @@ func getEnvPath() string {
 func setDefaultValue(v *viper.Viper) {
        v.SetDefault("PORT", "8080")
        v.SetDefault("PLUGIN_DIR", "bin/plugins")
-       v.SetDefault("TEMPORAL_TASK_QUEUE", "DEVLAKE_TASK_QUEUE")
        v.SetDefault("REMOTE_PLUGIN_DIR", "python/plugins")
        v.SetDefault("SWAGGER_DOCS_DIR", "resources/swagger")
 }
diff --git a/backend/go.mod b/backend/go.mod
index 46e741447..2f692e35a 100644
--- a/backend/go.mod
+++ b/backend/go.mod
@@ -30,8 +30,6 @@ require (
        github.com/swaggo/swag v1.16.1
        github.com/tidwall/gjson v1.14.3
        github.com/viant/afs v1.16.0
-       go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a
-       go.temporal.io/sdk v1.14.0
        golang.org/x/crypto v0.9.0
        golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f
        golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
diff --git a/backend/server/services/pipeline.go 
b/backend/server/services/pipeline.go
index 8e72901ed..97e6d06e0 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -33,14 +33,10 @@ import (
        "github.com/apache/incubator-devlake/helpers/dbhelper"
        "github.com/apache/incubator-devlake/impls/logruslog"
        "github.com/google/uuid"
-       v11 "go.temporal.io/api/enums/v1"
-       "go.temporal.io/sdk/client"
-       "go.temporal.io/sdk/converter"
        "golang.org/x/sync/semaphore"
 )
 
 var notificationService *NotificationService
-var temporalClient client.Client
 var globalPipelineLog = logruslog.Global.Nested("pipeline service")
 
 // PipelineQuery is a query for GetPipelines
@@ -60,46 +56,32 @@ func pipelineServiceInit() {
                notificationService = 
NewNotificationService(notificationEndpoint, notificationSecret)
        }
 
-       // temporal client
-       var temporalUrl = cfg.GetString("TEMPORAL_URL")
-       if temporalUrl != "" {
-               // TODO: logger
-               var err error
-               temporalClient, err = client.NewClient(client.Options{
-                       HostPort: temporalUrl,
-               })
-               if err != nil {
-                       panic(err)
-               }
-               watchTemporalPipelines()
-       } else {
-               // standalone mode: reset pipeline status
-               errMsg := "The process was terminated unexpectedly"
-               err := db.UpdateColumns(
-                       &models.Pipeline{},
-                       []dal.DalSet{
-                               {ColumnName: "status", Value: 
models.TASK_FAILED},
-                               {ColumnName: "message", Value: errMsg},
-                       },
-                       dal.Where("status = ?", models.TASK_RUNNING),
-               )
-               if err != nil {
-                       panic(err)
-               }
-               err = db.UpdateColumns(
-                       &models.Task{},
-                       []dal.DalSet{
-                               {ColumnName: "status", Value: 
models.TASK_FAILED},
-                               {ColumnName: "message", Value: errMsg},
-                       },
-                       dal.Where("status = ?", models.TASK_RUNNING),
-               )
-               if err != nil {
-                       panic(err)
-               }
+       // standalone mode: reset pipeline status
+       errMsg := "The process was terminated unexpectedly"
+       err := db.UpdateColumns(
+               &models.Pipeline{},
+               []dal.DalSet{
+                       {ColumnName: "status", Value: models.TASK_FAILED},
+                       {ColumnName: "message", Value: errMsg},
+               },
+               dal.Where("status = ?", models.TASK_RUNNING),
+       )
+       if err != nil {
+               panic(err)
+       }
+       err = db.UpdateColumns(
+               &models.Task{},
+               []dal.DalSet{
+                       {ColumnName: "status", Value: models.TASK_FAILED},
+                       {ColumnName: "message", Value: errMsg},
+               },
+               dal.Where("status = ?", models.TASK_RUNNING),
+       )
+       if err != nil {
+               panic(err)
        }
 
-       err := ReloadBlueprints(cronManager)
+       err = ReloadBlueprints(cronManager)
        if err != nil {
                panic(err)
        }
@@ -268,101 +250,6 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
        }
 }
 
-func watchTemporalPipelines() {
-       ticker := time.NewTicker(3 * time.Second)
-       dc := converter.GetDefaultDataConverter()
-       go func() {
-               // run forever
-               for range ticker.C {
-                       // load all running pipeline from database
-                       runningDbPipelines := make([]models.Pipeline, 0)
-                       err := db.All(&runningDbPipelines, dal.Where("status = 
?", models.TASK_RUNNING))
-                       if err != nil {
-                               panic(err)
-                       }
-                       // progressDetails will be only used in this goroutine 
now
-                       // So it needn't lock and unlock now
-                       progressDetails := 
make(map[uint64]*models.TaskProgressDetail)
-                       // check their status against temporal
-                       for _, rp := range runningDbPipelines {
-                               workflowId := getTemporalWorkflowId(rp.ID)
-                               desc, err := 
temporalClient.DescribeWorkflowExecution(
-                                       context.Background(),
-                                       workflowId,
-                                       "",
-                               )
-                               if err != nil {
-                                       globalPipelineLog.Error(err, "failed to 
query workflow execution: %v", err)
-                                       continue
-                               }
-                               // workflow is terminated by outsider
-                               s := desc.WorkflowExecutionInfo.Status
-                               if s != v11.WORKFLOW_EXECUTION_STATUS_RUNNING {
-                                       rp.Status = models.TASK_COMPLETED
-                                       if s != 
v11.WORKFLOW_EXECUTION_STATUS_COMPLETED {
-                                               rp.Status = models.TASK_FAILED
-                                               // get error message
-                                               hisIter := 
temporalClient.GetWorkflowHistory(
-                                                       context.Background(),
-                                                       workflowId,
-                                                       "",
-                                                       false,
-                                                       
v11.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
-                                               )
-                                               for hisIter.HasNext() {
-                                                       his, err := 
hisIter.Next()
-                                                       if err != nil {
-                                                               
globalPipelineLog.Error(err, "failed to get next from workflow history 
iterator: %v", err)
-                                                               continue
-                                                       }
-                                                       rp.Message = 
fmt.Sprintf("temporal event type: %v", his.GetEventType())
-                                               }
-                                       }
-                                       rp.FinishedAt = 
desc.WorkflowExecutionInfo.CloseTime
-                                       err = db.UpdateColumns(rp, []dal.DalSet{
-                                               {ColumnName: "status", Value: 
rp.Status},
-                                               {ColumnName: "message", Value: 
rp.Message},
-                                               {ColumnName: "finished_at", 
Value: rp.FinishedAt},
-                                       })
-                                       if err != nil {
-                                               globalPipelineLog.Error(err, 
"failed to update db: %v", err)
-                                       }
-                                       continue
-                               }
-
-                               // check pending activity
-                               for _, activity := range desc.PendingActivities 
{
-                                       taskId, err := 
getTaskIdFromActivityId(activity.ActivityId)
-                                       if err != nil {
-                                               globalPipelineLog.Error(err, 
"unable to extract task id from activity id `%s`", activity.ActivityId)
-                                               continue
-                                       }
-                                       progressDetail := 
&models.TaskProgressDetail{}
-                                       progressDetails[taskId] = progressDetail
-                                       heartbeats := 
activity.GetHeartbeatDetails()
-                                       if heartbeats == nil {
-                                               continue
-                                       }
-                                       payloads := heartbeats.GetPayloads()
-                                       if len(payloads) == 0 {
-                                               return
-                                       }
-                                       lastPayload := payloads[len(payloads)-1]
-                                       if err := dc.FromPayload(lastPayload, 
progressDetail); err != nil {
-                                               globalPipelineLog.Error(err, 
"failed to unmarshal heartbeat payload: %v", err)
-                                               continue
-                                       }
-                               }
-                       }
-                       runningTasks.setAll(progressDetails)
-               }
-       }()
-}
-
-func getTemporalWorkflowId(pipelineId uint64) string {
-       return fmt.Sprintf("pipeline #%d", pipelineId)
-}
-
 // NotifyExternal FIXME ...
 func NotifyExternal(pipelineId uint64) errors.Error {
        if notificationService == nil {
@@ -415,9 +302,6 @@ func CancelPipeline(pipelineId uint64) errors.Error {
                // the target pipeline is pending, no running, no need to 
perform the actual cancel operation
                return nil
        }
-       if temporalClient != nil {
-               return 
errors.Convert(temporalClient.CancelWorkflow(context.Background(), 
getTemporalWorkflowId(pipelineId), ""))
-       }
        pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, 
Pending: 1, Pagination: Pagination{PageSize: -1}})
        if err != nil {
                return errors.Convert(err)
diff --git a/backend/server/services/pipeline_runner.go 
b/backend/server/services/pipeline_runner.go
index cb3d27f4a..c11390315 100644
--- a/backend/server/services/pipeline_runner.go
+++ b/backend/server/services/pipeline_runner.go
@@ -19,7 +19,6 @@ package services
 
 import (
        "context"
-       "encoding/json"
        "fmt"
        "github.com/apache/incubator-devlake/core/dal"
        "github.com/apache/incubator-devlake/core/errors"
@@ -27,8 +26,6 @@ import (
        "github.com/apache/incubator-devlake/core/models"
        "github.com/apache/incubator-devlake/core/runner"
        "github.com/apache/incubator-devlake/impls/logruslog"
-       "github.com/apache/incubator-devlake/worker/app"
-       "go.temporal.io/sdk/client"
        "time"
 )
 
@@ -47,37 +44,6 @@ func (p *pipelineRunner) runPipelineStandalone() 
errors.Error {
        )
 }
 
-func (p *pipelineRunner) runPipelineViaTemporal() errors.Error {
-       workflowOpts := client.StartWorkflowOptions{
-               ID:        getTemporalWorkflowId(p.pipeline.ID),
-               TaskQueue: cfg.GetString("TEMPORAL_TASK_QUEUE"),
-       }
-       // send only the very basis data
-       configJson, err := json.Marshal(cfg.AllSettings())
-       if err != nil {
-               return errors.Convert(err)
-       }
-       p.logger.Info("enqueue pipeline #%d into temporal task queue", 
p.pipeline.ID)
-       workflow, err := temporalClient.ExecuteWorkflow(
-               context.Background(),
-               workflowOpts,
-               app.DevLakePipelineWorkflow,
-               configJson,
-               p.pipeline.ID,
-               p.logger.GetConfig(),
-       )
-       if err != nil {
-               p.logger.Error(err, "failed to enqueue pipeline #%d into 
temporal", p.pipeline.ID)
-               return errors.Convert(err)
-       }
-       err = workflow.Get(context.Background(), nil)
-       if err != nil {
-               p.logger.Info("failed to execute pipeline #%d via temporal: 
%v", p.pipeline.ID, err)
-       }
-       p.logger.Info("pipeline #%d finished by temporal", p.pipeline.ID)
-       return errors.Convert(err)
-}
-
 // GetPipelineLogger returns logger for the pipeline
 func GetPipelineLogger(pipeline *models.Pipeline) log.Logger {
        pipelineLogger := globalPipelineLog.Nested(
@@ -107,11 +73,7 @@ func runPipeline(pipelineId uint64) errors.Error {
                pipeline: ppl,
        }
        // run
-       if temporalClient != nil {
-               err = pipelineRun.runPipelineViaTemporal()
-       } else {
-               err = pipelineRun.runPipelineStandalone()
-       }
+       err = pipelineRun.runPipelineStandalone()
        isCancelled := errors.Is(err, context.Canceled)
        if err != nil {
                err = errors.Default.Wrap(err, fmt.Sprintf("Error running 
pipeline %d.", pipelineId))
diff --git a/backend/worker/app/logger.go b/backend/worker/app/logger.go
deleted file mode 100644
index 7e1ca7c04..000000000
--- a/backend/worker/app/logger.go
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package app
-
-import (
-       "fmt"
-       core "github.com/apache/incubator-devlake/core/log"
-       "go.temporal.io/sdk/log"
-)
-
-// TemporalLogger FIXME ...
-type TemporalLogger struct {
-       log core.Logger
-}
-
-// NewTemporalLogger FIXME ...
-func NewTemporalLogger(coreLogger core.Logger) log.Logger {
-       return &TemporalLogger{
-               coreLogger,
-       }
-}
-
-// Log FIXME ...
-func (l *TemporalLogger) Log(lv core.LogLevel, msg string, keyvals 
...interface{}) {
-       if l.log.IsLevelEnabled(lv) {
-               for i := 0; i < len(keyvals); i += 2 {
-                       msg += fmt.Sprintf(" %s %v", keyvals[i], keyvals[i+1])
-               }
-               l.log.Log(lv, msg)
-       }
-}
-
-// Debug FIXME ...
-func (l *TemporalLogger) Debug(msg string, keyvals ...interface{}) {
-       l.Log(core.LOG_DEBUG, msg, keyvals...)
-}
-
-// Info FIXME ...
-func (l *TemporalLogger) Info(msg string, keyvals ...interface{}) {
-       l.Log(core.LOG_INFO, msg, keyvals...)
-}
-
-// Warn FIXME ...
-func (l *TemporalLogger) Warn(msg string, keyvals ...interface{}) {
-       l.Log(core.LOG_WARN, msg, keyvals...)
-}
-
-// Error FIXME ...
-func (l *TemporalLogger) Error(msg string, keyvals ...interface{}) {
-       l.Log(core.LOG_ERROR, msg, keyvals...)
-}
diff --git a/backend/worker/app/pipeline_workflow.go 
b/backend/worker/app/pipeline_workflow.go
deleted file mode 100644
index de6350b54..000000000
--- a/backend/worker/app/pipeline_workflow.go
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package app
-
-import (
-       "fmt"
-       "github.com/apache/incubator-devlake/core/errors"
-       "github.com/apache/incubator-devlake/core/log"
-       "github.com/apache/incubator-devlake/core/runner"
-       "strings"
-       "time"
-
-       "go.temporal.io/sdk/workflow"
-)
-
-// DevLakePipelineWorkflow FIXME ...
-func DevLakePipelineWorkflow(ctx workflow.Context, configJson []byte, 
pipelineId uint64, loggerConfig *log.LoggerConfig) errors.Error {
-       basicRes, err := loadResources(configJson, loggerConfig)
-       if err != nil {
-               return errors.Convert(err)
-       }
-       logger := basicRes.GetLogger()
-       logger.Info("received pipeline #%d", pipelineId)
-       err = runner.RunPipeline(
-               basicRes,
-               pipelineId,
-               func(taskIds []uint64) errors.Error {
-                       return runTasks(ctx, configJson, taskIds, logger)
-               },
-       )
-       if err != nil {
-               logger.Error(err, "failed to execute pipeline #%d", pipelineId)
-       }
-       logger.Info("finished pipeline #%d", pipelineId)
-       return err
-}
-
-func runTasks(ctx workflow.Context, configJson []byte, taskIds []uint64, 
logger log.Logger) errors.Error {
-       cleanExit := false
-       defer func() {
-               if !cleanExit {
-                       logger.Error(nil, "fatal error while executing task 
Ids: %v", taskIds)
-               }
-       }()
-       futures := make([]workflow.Future, len(taskIds))
-       for i, taskId := range taskIds {
-               activityOpts := workflow.ActivityOptions{
-                       ActivityID:          fmt.Sprintf("task #%d", taskId),
-                       StartToCloseTimeout: 24 * time.Hour,
-                       WaitForCancellation: true,
-               }
-               activityCtx := workflow.WithActivityOptions(ctx, activityOpts)
-               futures[i] = workflow.ExecuteActivity(activityCtx, 
DevLakeTaskActivity, configJson, taskId, logger)
-       }
-       errs := make([]string, 0)
-       for _, future := range futures {
-               err := future.Get(ctx, nil)
-               if err != nil {
-                       errs = append(errs, err.Error())
-               }
-       }
-       cleanExit = true
-       if len(errs) > 0 {
-               return errors.Default.New(strings.Join(errs, "\n"))
-       }
-       return nil
-}
diff --git a/backend/worker/app/shared.go b/backend/worker/app/shared.go
deleted file mode 100644
index 6b52e7801..000000000
--- a/backend/worker/app/shared.go
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package app
-
-import (
-       "bytes"
-       "github.com/apache/incubator-devlake/core/context"
-       "github.com/apache/incubator-devlake/core/errors"
-       "github.com/apache/incubator-devlake/core/log"
-       "github.com/apache/incubator-devlake/core/runner"
-       "github.com/apache/incubator-devlake/impls/logruslog"
-       "github.com/spf13/viper"
-)
-
-func loadResources(configJson []byte, loggerConfig *log.LoggerConfig) 
(context.BasicRes, errors.Error) {
-       // TODO: should be redirected to server
-       globalLogger := logruslog.Global.Nested("worker")
-       // prepare
-       cfg := viper.New()
-       cfg.SetConfigType("json")
-       err := cfg.ReadConfig(bytes.NewBuffer(configJson))
-       if err != nil {
-               globalLogger.Error(err, "failed to load resources")
-               return nil, errors.Convert(err)
-       }
-       db, err := runner.NewGormDb(cfg, globalLogger)
-       if err != nil {
-               return nil, errors.Convert(err)
-       }
-       logger, err := getWorkerLogger(globalLogger, loggerConfig)
-       if err != nil {
-               return nil, errors.Convert(err)
-       }
-       return runner.CreateBasicRes(cfg, logger, db), nil
-}
-
-func getWorkerLogger(logger log.Logger, logConfig *log.LoggerConfig) 
(log.Logger, errors.Error) {
-       newLogger := logger.Nested(logConfig.Prefix)
-       stream, err := logruslog.GetFileStream(logConfig.Path)
-       if err != nil {
-               return nil, err
-       }
-       newLogger.SetStream(&log.LoggerStreamConfig{
-               Path:   logConfig.Path,
-               Writer: stream,
-       })
-       return newLogger, nil
-}
diff --git a/backend/worker/app/task_activity.go 
b/backend/worker/app/task_activity.go
deleted file mode 100644
index 87fea1ae4..000000000
--- a/backend/worker/app/task_activity.go
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package app
-
-import (
-       "context"
-       "github.com/apache/incubator-devlake/core/errors"
-       "github.com/apache/incubator-devlake/core/log"
-       "github.com/apache/incubator-devlake/core/models"
-       "github.com/apache/incubator-devlake/core/plugin"
-       "github.com/apache/incubator-devlake/core/runner"
-       "go.temporal.io/sdk/activity"
-)
-
-// DevLakeTaskActivity FIXME ...
-func DevLakeTaskActivity(ctx context.Context, configJson []byte, taskId 
uint64, loggerConfig *log.LoggerConfig) errors.Error {
-       basicRes, err := loadResources(configJson, loggerConfig)
-       if err != nil {
-               return err
-       }
-       logger := basicRes.GetLogger()
-       logger.Info("received task #%d", taskId)
-       progressDetail := &models.TaskProgressDetail{}
-       progChan := make(chan plugin.RunningProgress)
-       defer close(progChan)
-       go func() {
-               for p := range progChan {
-                       runner.UpdateProgressDetail(basicRes, taskId, 
progressDetail, &p)
-                       activity.RecordHeartbeat(ctx, progressDetail)
-               }
-       }()
-       err = runner.RunTask(ctx, basicRes, progChan, taskId)
-       if err != nil {
-               logger.Error(err, "failed to execute task #%d", taskId)
-       }
-       logger.Info("finished task #%d", taskId)
-       return err
-}
diff --git a/backend/worker/main.go b/backend/worker/main.go
deleted file mode 100644
index 58a008153..000000000
--- a/backend/worker/main.go
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package main
-
-import (
-       "github.com/apache/incubator-devlake/core/errors"
-       "github.com/apache/incubator-devlake/core/runner"
-       _ "github.com/apache/incubator-devlake/core/version"
-       "github.com/apache/incubator-devlake/impls/logruslog"
-       "github.com/apache/incubator-devlake/worker/app"
-       "go.temporal.io/sdk/client"
-       "go.temporal.io/sdk/worker"
-       "log"
-)
-
-func main() {
-       basicRes := runner.CreateAppBasicRes()
-       err := runner.LoadPlugins(basicRes)
-       if err != nil {
-               panic(err)
-       }
-
-       // establish temporal connection
-       TASK_QUEUE := basicRes.GetConfig("TEMPORAL_TASK_QUEUE")
-       // Create the client object just once per process
-       c, err := errors.Convert01(client.NewClient(client.Options{
-               HostPort: basicRes.GetConfig("TEMPORAL_URL"),
-               Logger:   app.NewTemporalLogger(logruslog.Global),
-       }))
-       if err != nil {
-               log.Fatalln("unable to create Temporal client", err)
-       }
-       defer c.Close()
-       // This worker hosts both Workflow and Activity functions
-       w := worker.New(c, TASK_QUEUE, worker.Options{})
-       w.RegisterWorkflow(app.DevLakePipelineWorkflow)
-       w.RegisterActivity(app.DevLakeTaskActivity)
-       // Start listening to the Task Queue
-       err = errors.Convert(w.Run(worker.InterruptCh()))
-       if err != nil {
-               log.Fatalln("unable to start Worker", err)
-       }
-}

Reply via email to