This is an automated email from the ASF dual-hosted git repository. ka94 pushed a commit to branch issues/5939 in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit 974e8b3a7bb938fb2694ad61b5767649ec3df215 Author: Keon Amini <[email protected]> AuthorDate: Thu Sep 28 16:52:36 2023 -0500 refactor: removed temporal support --- Makefile | 8 +- backend/Makefile | 12 +-- backend/core/config/config_viper.go | 1 - backend/go.mod | 2 - backend/server/services/pipeline.go | 164 +++++------------------------ backend/server/services/pipeline_runner.go | 40 +------ backend/server/services/task.go | 2 - backend/server/services/task_runner.go | 26 ----- backend/worker/app/logger.go | 66 ------------ backend/worker/app/pipeline_workflow.go | 82 --------------- backend/worker/app/shared.go | 63 ----------- backend/worker/app/task_activity.go | 53 ---------- backend/worker/main.go | 58 ---------- 13 files changed, 27 insertions(+), 550 deletions(-) diff --git a/Makefile b/Makefile index 1ce6c7027..166b1ae17 100644 --- a/Makefile +++ b/Makefile @@ -71,15 +71,12 @@ swag: build-plugin: make build-plugin -C backend -build-worker: - make build-worker -C backend - build-server: make build-server -C backend build: build-plugin build-server -all: build build-worker +all: build tap-models: make tap-models -C backend @@ -87,9 +84,6 @@ tap-models: run: make run -C backend -worker: - make worker -C backend - dev: make dev -C backend diff --git a/backend/Makefile b/backend/Makefile index b86c95348..48dc38725 100644 --- a/backend/Makefile +++ b/backend/Makefile @@ -57,13 +57,6 @@ build-plugin: PLUGIN=$(PLUGIN) sh scripts/compile-plugins.sh; \ fi -build-worker: - if [ "$(DEBUG)" = "true" ]; then \ - go build -gcflags='all=-N -l' -ldflags "-X 'github.com/apache/incubator-devlake/core/version.Version=$(VERSION)'" -o bin/lake-worker ./worker/; \ - else \ - go build -ldflags "-X 'github.com/apache/incubator-devlake/core/version.Version=$(VERSION)'" -o bin/lake-worker ./worker/; \ - fi - build-server: swag if [ "$(DEBUG)" = "true" ]; then \ go build -gcflags='all=-N -l' -ldflags "-X 'github.com/apache/incubator-devlake/core/version.Version=$(VERSION)'" -o bin/lake ./server/; \ @@ -78,14 +71,11 @@ build-python: #don't mix this with the other build commands build: build-plugin build-server -all: build build-worker +all: build run: go run server/main.go -worker: - go run worker/*.go - dev: build-plugin build-python run godev: diff --git a/backend/core/config/config_viper.go b/backend/core/config/config_viper.go index 80d1dd85d..0937f2237 100644 --- a/backend/core/config/config_viper.go +++ b/backend/core/config/config_viper.go @@ -102,7 +102,6 @@ func getEnvPath() string { func setDefaultValue(v *viper.Viper) { v.SetDefault("PORT", "8080") v.SetDefault("PLUGIN_DIR", "bin/plugins") - v.SetDefault("TEMPORAL_TASK_QUEUE", "DEVLAKE_TASK_QUEUE") v.SetDefault("REMOTE_PLUGIN_DIR", "python/plugins") v.SetDefault("SWAGGER_DOCS_DIR", "resources/swagger") } diff --git a/backend/go.mod b/backend/go.mod index 46e741447..2f692e35a 100644 --- a/backend/go.mod +++ b/backend/go.mod @@ -30,8 +30,6 @@ require ( github.com/swaggo/swag v1.16.1 github.com/tidwall/gjson v1.14.3 github.com/viant/afs v1.16.0 - go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a - go.temporal.io/sdk v1.14.0 golang.org/x/crypto v0.9.0 golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 diff --git a/backend/server/services/pipeline.go b/backend/server/services/pipeline.go index 8e72901ed..97e6d06e0 100644 --- a/backend/server/services/pipeline.go +++ b/backend/server/services/pipeline.go @@ -33,14 +33,10 @@ import ( "github.com/apache/incubator-devlake/helpers/dbhelper" "github.com/apache/incubator-devlake/impls/logruslog" "github.com/google/uuid" - v11 "go.temporal.io/api/enums/v1" - "go.temporal.io/sdk/client" - "go.temporal.io/sdk/converter" "golang.org/x/sync/semaphore" ) var notificationService *NotificationService -var temporalClient client.Client var globalPipelineLog = logruslog.Global.Nested("pipeline service") // PipelineQuery is a query for GetPipelines @@ -60,46 +56,32 @@ func pipelineServiceInit() { notificationService = NewNotificationService(notificationEndpoint, notificationSecret) } - // temporal client - var temporalUrl = cfg.GetString("TEMPORAL_URL") - if temporalUrl != "" { - // TODO: logger - var err error - temporalClient, err = client.NewClient(client.Options{ - HostPort: temporalUrl, - }) - if err != nil { - panic(err) - } - watchTemporalPipelines() - } else { - // standalone mode: reset pipeline status - errMsg := "The process was terminated unexpectedly" - err := db.UpdateColumns( - &models.Pipeline{}, - []dal.DalSet{ - {ColumnName: "status", Value: models.TASK_FAILED}, - {ColumnName: "message", Value: errMsg}, - }, - dal.Where("status = ?", models.TASK_RUNNING), - ) - if err != nil { - panic(err) - } - err = db.UpdateColumns( - &models.Task{}, - []dal.DalSet{ - {ColumnName: "status", Value: models.TASK_FAILED}, - {ColumnName: "message", Value: errMsg}, - }, - dal.Where("status = ?", models.TASK_RUNNING), - ) - if err != nil { - panic(err) - } + // standalone mode: reset pipeline status + errMsg := "The process was terminated unexpectedly" + err := db.UpdateColumns( + &models.Pipeline{}, + []dal.DalSet{ + {ColumnName: "status", Value: models.TASK_FAILED}, + {ColumnName: "message", Value: errMsg}, + }, + dal.Where("status = ?", models.TASK_RUNNING), + ) + if err != nil { + panic(err) + } + err = db.UpdateColumns( + &models.Task{}, + []dal.DalSet{ + {ColumnName: "status", Value: models.TASK_FAILED}, + {ColumnName: "message", Value: errMsg}, + }, + dal.Where("status = ?", models.TASK_RUNNING), + ) + if err != nil { + panic(err) } - err := ReloadBlueprints(cronManager) + err = ReloadBlueprints(cronManager) if err != nil { panic(err) } @@ -268,101 +250,6 @@ func RunPipelineInQueue(pipelineMaxParallel int64) { } } -func watchTemporalPipelines() { - ticker := time.NewTicker(3 * time.Second) - dc := converter.GetDefaultDataConverter() - go func() { - // run forever - for range ticker.C { - // load all running pipeline from database - runningDbPipelines := make([]models.Pipeline, 0) - err := db.All(&runningDbPipelines, dal.Where("status = ?", models.TASK_RUNNING)) - if err != nil { - panic(err) - } - // progressDetails will be only used in this goroutine now - // So it needn't lock and unlock now - progressDetails := make(map[uint64]*models.TaskProgressDetail) - // check their status against temporal - for _, rp := range runningDbPipelines { - workflowId := getTemporalWorkflowId(rp.ID) - desc, err := temporalClient.DescribeWorkflowExecution( - context.Background(), - workflowId, - "", - ) - if err != nil { - globalPipelineLog.Error(err, "failed to query workflow execution: %v", err) - continue - } - // workflow is terminated by outsider - s := desc.WorkflowExecutionInfo.Status - if s != v11.WORKFLOW_EXECUTION_STATUS_RUNNING { - rp.Status = models.TASK_COMPLETED - if s != v11.WORKFLOW_EXECUTION_STATUS_COMPLETED { - rp.Status = models.TASK_FAILED - // get error message - hisIter := temporalClient.GetWorkflowHistory( - context.Background(), - workflowId, - "", - false, - v11.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT, - ) - for hisIter.HasNext() { - his, err := hisIter.Next() - if err != nil { - globalPipelineLog.Error(err, "failed to get next from workflow history iterator: %v", err) - continue - } - rp.Message = fmt.Sprintf("temporal event type: %v", his.GetEventType()) - } - } - rp.FinishedAt = desc.WorkflowExecutionInfo.CloseTime - err = db.UpdateColumns(rp, []dal.DalSet{ - {ColumnName: "status", Value: rp.Status}, - {ColumnName: "message", Value: rp.Message}, - {ColumnName: "finished_at", Value: rp.FinishedAt}, - }) - if err != nil { - globalPipelineLog.Error(err, "failed to update db: %v", err) - } - continue - } - - // check pending activity - for _, activity := range desc.PendingActivities { - taskId, err := getTaskIdFromActivityId(activity.ActivityId) - if err != nil { - globalPipelineLog.Error(err, "unable to extract task id from activity id `%s`", activity.ActivityId) - continue - } - progressDetail := &models.TaskProgressDetail{} - progressDetails[taskId] = progressDetail - heartbeats := activity.GetHeartbeatDetails() - if heartbeats == nil { - continue - } - payloads := heartbeats.GetPayloads() - if len(payloads) == 0 { - return - } - lastPayload := payloads[len(payloads)-1] - if err := dc.FromPayload(lastPayload, progressDetail); err != nil { - globalPipelineLog.Error(err, "failed to unmarshal heartbeat payload: %v", err) - continue - } - } - } - runningTasks.setAll(progressDetails) - } - }() -} - -func getTemporalWorkflowId(pipelineId uint64) string { - return fmt.Sprintf("pipeline #%d", pipelineId) -} - // NotifyExternal FIXME ... func NotifyExternal(pipelineId uint64) errors.Error { if notificationService == nil { @@ -415,9 +302,6 @@ func CancelPipeline(pipelineId uint64) errors.Error { // the target pipeline is pending, no running, no need to perform the actual cancel operation return nil } - if temporalClient != nil { - return errors.Convert(temporalClient.CancelWorkflow(context.Background(), getTemporalWorkflowId(pipelineId), "")) - } pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId, Pending: 1, Pagination: Pagination{PageSize: -1}}) if err != nil { return errors.Convert(err) diff --git a/backend/server/services/pipeline_runner.go b/backend/server/services/pipeline_runner.go index cb3d27f4a..c11390315 100644 --- a/backend/server/services/pipeline_runner.go +++ b/backend/server/services/pipeline_runner.go @@ -19,7 +19,6 @@ package services import ( "context" - "encoding/json" "fmt" "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" @@ -27,8 +26,6 @@ import ( "github.com/apache/incubator-devlake/core/models" "github.com/apache/incubator-devlake/core/runner" "github.com/apache/incubator-devlake/impls/logruslog" - "github.com/apache/incubator-devlake/worker/app" - "go.temporal.io/sdk/client" "time" ) @@ -47,37 +44,6 @@ func (p *pipelineRunner) runPipelineStandalone() errors.Error { ) } -func (p *pipelineRunner) runPipelineViaTemporal() errors.Error { - workflowOpts := client.StartWorkflowOptions{ - ID: getTemporalWorkflowId(p.pipeline.ID), - TaskQueue: cfg.GetString("TEMPORAL_TASK_QUEUE"), - } - // send only the very basis data - configJson, err := json.Marshal(cfg.AllSettings()) - if err != nil { - return errors.Convert(err) - } - p.logger.Info("enqueue pipeline #%d into temporal task queue", p.pipeline.ID) - workflow, err := temporalClient.ExecuteWorkflow( - context.Background(), - workflowOpts, - app.DevLakePipelineWorkflow, - configJson, - p.pipeline.ID, - p.logger.GetConfig(), - ) - if err != nil { - p.logger.Error(err, "failed to enqueue pipeline #%d into temporal", p.pipeline.ID) - return errors.Convert(err) - } - err = workflow.Get(context.Background(), nil) - if err != nil { - p.logger.Info("failed to execute pipeline #%d via temporal: %v", p.pipeline.ID, err) - } - p.logger.Info("pipeline #%d finished by temporal", p.pipeline.ID) - return errors.Convert(err) -} - // GetPipelineLogger returns logger for the pipeline func GetPipelineLogger(pipeline *models.Pipeline) log.Logger { pipelineLogger := globalPipelineLog.Nested( @@ -107,11 +73,7 @@ func runPipeline(pipelineId uint64) errors.Error { pipeline: ppl, } // run - if temporalClient != nil { - err = pipelineRun.runPipelineViaTemporal() - } else { - err = pipelineRun.runPipelineStandalone() - } + err = pipelineRun.runPipelineStandalone() isCancelled := errors.Is(err, context.Canceled) if err != nil { err = errors.Default.Wrap(err, fmt.Sprintf("Error running pipeline %d.", pipelineId)) diff --git a/backend/server/services/task.go b/backend/server/services/task.go index aab13bc29..6c19abd7b 100644 --- a/backend/server/services/task.go +++ b/backend/server/services/task.go @@ -20,7 +20,6 @@ package services import ( "context" "fmt" - "regexp" "strings" "github.com/apache/incubator-devlake/core/dal" @@ -31,7 +30,6 @@ import ( ) var taskLog = logruslog.Global.Nested("task service") -var activityPattern = regexp.MustCompile(`task #(\d+)`) // TaskQuery FIXME . type TaskQuery struct { diff --git a/backend/server/services/task_runner.go b/backend/server/services/task_runner.go index b3db5b5dc..0ad643bf3 100644 --- a/backend/server/services/task_runner.go +++ b/backend/server/services/task_runner.go @@ -25,7 +25,6 @@ import ( "github.com/apache/incubator-devlake/core/models" "github.com/apache/incubator-devlake/core/plugin" "github.com/apache/incubator-devlake/core/runner" - "strconv" "sync" ) @@ -55,23 +54,6 @@ func (rt *RunningTask) Add(taskId uint64, cancel context.CancelFunc) errors.Erro return nil } -func (rt *RunningTask) setAll(progressDetails map[uint64]*models.TaskProgressDetail) { - rt.mu.Lock() - defer rt.mu.Unlock() - // delete finished tasks - for taskId := range rt.tasks { - if _, ok := progressDetails[taskId]; !ok { - delete(rt.tasks, taskId) - } - } - for taskId, progressDetail := range progressDetails { - if _, ok := rt.tasks[taskId]; !ok { - rt.tasks[taskId] = &RunningTaskData{} - } - rt.tasks[taskId].ProgressDetail = progressDetail - } -} - // FillProgressDetailToTasks lock less times than GetProgressDetail func (rt *RunningTask) FillProgressDetailToTasks(tasks []*models.Task) { rt.mu.Lock() @@ -157,11 +139,3 @@ func updateTaskProgress(taskId uint64, progress chan plugin.RunningProgress) { runningTasks.mu.Unlock() } } - -func getTaskIdFromActivityId(activityId string) (uint64, errors.Error) { - submatches := activityPattern.FindStringSubmatch(activityId) - if len(submatches) < 2 { - return 0, errors.Default.New("activityId does not match") - } - return errors.Convert01(strconv.ParseUint(submatches[1], 10, 64)) -} diff --git a/backend/worker/app/logger.go b/backend/worker/app/logger.go deleted file mode 100644 index 7e1ca7c04..000000000 --- a/backend/worker/app/logger.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "fmt" - core "github.com/apache/incubator-devlake/core/log" - "go.temporal.io/sdk/log" -) - -// TemporalLogger FIXME ... -type TemporalLogger struct { - log core.Logger -} - -// NewTemporalLogger FIXME ... -func NewTemporalLogger(coreLogger core.Logger) log.Logger { - return &TemporalLogger{ - coreLogger, - } -} - -// Log FIXME ... -func (l *TemporalLogger) Log(lv core.LogLevel, msg string, keyvals ...interface{}) { - if l.log.IsLevelEnabled(lv) { - for i := 0; i < len(keyvals); i += 2 { - msg += fmt.Sprintf(" %s %v", keyvals[i], keyvals[i+1]) - } - l.log.Log(lv, msg) - } -} - -// Debug FIXME ... -func (l *TemporalLogger) Debug(msg string, keyvals ...interface{}) { - l.Log(core.LOG_DEBUG, msg, keyvals...) -} - -// Info FIXME ... -func (l *TemporalLogger) Info(msg string, keyvals ...interface{}) { - l.Log(core.LOG_INFO, msg, keyvals...) -} - -// Warn FIXME ... -func (l *TemporalLogger) Warn(msg string, keyvals ...interface{}) { - l.Log(core.LOG_WARN, msg, keyvals...) -} - -// Error FIXME ... -func (l *TemporalLogger) Error(msg string, keyvals ...interface{}) { - l.Log(core.LOG_ERROR, msg, keyvals...) -} diff --git a/backend/worker/app/pipeline_workflow.go b/backend/worker/app/pipeline_workflow.go deleted file mode 100644 index de6350b54..000000000 --- a/backend/worker/app/pipeline_workflow.go +++ /dev/null @@ -1,82 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "fmt" - "github.com/apache/incubator-devlake/core/errors" - "github.com/apache/incubator-devlake/core/log" - "github.com/apache/incubator-devlake/core/runner" - "strings" - "time" - - "go.temporal.io/sdk/workflow" -) - -// DevLakePipelineWorkflow FIXME ... -func DevLakePipelineWorkflow(ctx workflow.Context, configJson []byte, pipelineId uint64, loggerConfig *log.LoggerConfig) errors.Error { - basicRes, err := loadResources(configJson, loggerConfig) - if err != nil { - return errors.Convert(err) - } - logger := basicRes.GetLogger() - logger.Info("received pipeline #%d", pipelineId) - err = runner.RunPipeline( - basicRes, - pipelineId, - func(taskIds []uint64) errors.Error { - return runTasks(ctx, configJson, taskIds, logger) - }, - ) - if err != nil { - logger.Error(err, "failed to execute pipeline #%d", pipelineId) - } - logger.Info("finished pipeline #%d", pipelineId) - return err -} - -func runTasks(ctx workflow.Context, configJson []byte, taskIds []uint64, logger log.Logger) errors.Error { - cleanExit := false - defer func() { - if !cleanExit { - logger.Error(nil, "fatal error while executing task Ids: %v", taskIds) - } - }() - futures := make([]workflow.Future, len(taskIds)) - for i, taskId := range taskIds { - activityOpts := workflow.ActivityOptions{ - ActivityID: fmt.Sprintf("task #%d", taskId), - StartToCloseTimeout: 24 * time.Hour, - WaitForCancellation: true, - } - activityCtx := workflow.WithActivityOptions(ctx, activityOpts) - futures[i] = workflow.ExecuteActivity(activityCtx, DevLakeTaskActivity, configJson, taskId, logger) - } - errs := make([]string, 0) - for _, future := range futures { - err := future.Get(ctx, nil) - if err != nil { - errs = append(errs, err.Error()) - } - } - cleanExit = true - if len(errs) > 0 { - return errors.Default.New(strings.Join(errs, "\n")) - } - return nil -} diff --git a/backend/worker/app/shared.go b/backend/worker/app/shared.go deleted file mode 100644 index 6b52e7801..000000000 --- a/backend/worker/app/shared.go +++ /dev/null @@ -1,63 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "bytes" - "github.com/apache/incubator-devlake/core/context" - "github.com/apache/incubator-devlake/core/errors" - "github.com/apache/incubator-devlake/core/log" - "github.com/apache/incubator-devlake/core/runner" - "github.com/apache/incubator-devlake/impls/logruslog" - "github.com/spf13/viper" -) - -func loadResources(configJson []byte, loggerConfig *log.LoggerConfig) (context.BasicRes, errors.Error) { - // TODO: should be redirected to server - globalLogger := logruslog.Global.Nested("worker") - // prepare - cfg := viper.New() - cfg.SetConfigType("json") - err := cfg.ReadConfig(bytes.NewBuffer(configJson)) - if err != nil { - globalLogger.Error(err, "failed to load resources") - return nil, errors.Convert(err) - } - db, err := runner.NewGormDb(cfg, globalLogger) - if err != nil { - return nil, errors.Convert(err) - } - logger, err := getWorkerLogger(globalLogger, loggerConfig) - if err != nil { - return nil, errors.Convert(err) - } - return runner.CreateBasicRes(cfg, logger, db), nil -} - -func getWorkerLogger(logger log.Logger, logConfig *log.LoggerConfig) (log.Logger, errors.Error) { - newLogger := logger.Nested(logConfig.Prefix) - stream, err := logruslog.GetFileStream(logConfig.Path) - if err != nil { - return nil, err - } - newLogger.SetStream(&log.LoggerStreamConfig{ - Path: logConfig.Path, - Writer: stream, - }) - return newLogger, nil -} diff --git a/backend/worker/app/task_activity.go b/backend/worker/app/task_activity.go deleted file mode 100644 index 87fea1ae4..000000000 --- a/backend/worker/app/task_activity.go +++ /dev/null @@ -1,53 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "context" - "github.com/apache/incubator-devlake/core/errors" - "github.com/apache/incubator-devlake/core/log" - "github.com/apache/incubator-devlake/core/models" - "github.com/apache/incubator-devlake/core/plugin" - "github.com/apache/incubator-devlake/core/runner" - "go.temporal.io/sdk/activity" -) - -// DevLakeTaskActivity FIXME ... -func DevLakeTaskActivity(ctx context.Context, configJson []byte, taskId uint64, loggerConfig *log.LoggerConfig) errors.Error { - basicRes, err := loadResources(configJson, loggerConfig) - if err != nil { - return err - } - logger := basicRes.GetLogger() - logger.Info("received task #%d", taskId) - progressDetail := &models.TaskProgressDetail{} - progChan := make(chan plugin.RunningProgress) - defer close(progChan) - go func() { - for p := range progChan { - runner.UpdateProgressDetail(basicRes, taskId, progressDetail, &p) - activity.RecordHeartbeat(ctx, progressDetail) - } - }() - err = runner.RunTask(ctx, basicRes, progChan, taskId) - if err != nil { - logger.Error(err, "failed to execute task #%d", taskId) - } - logger.Info("finished task #%d", taskId) - return err -} diff --git a/backend/worker/main.go b/backend/worker/main.go deleted file mode 100644 index 58a008153..000000000 --- a/backend/worker/main.go +++ /dev/null @@ -1,58 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "github.com/apache/incubator-devlake/core/errors" - "github.com/apache/incubator-devlake/core/runner" - _ "github.com/apache/incubator-devlake/core/version" - "github.com/apache/incubator-devlake/impls/logruslog" - "github.com/apache/incubator-devlake/worker/app" - "go.temporal.io/sdk/client" - "go.temporal.io/sdk/worker" - "log" -) - -func main() { - basicRes := runner.CreateAppBasicRes() - err := runner.LoadPlugins(basicRes) - if err != nil { - panic(err) - } - - // establish temporal connection - TASK_QUEUE := basicRes.GetConfig("TEMPORAL_TASK_QUEUE") - // Create the client object just once per process - c, err := errors.Convert01(client.NewClient(client.Options{ - HostPort: basicRes.GetConfig("TEMPORAL_URL"), - Logger: app.NewTemporalLogger(logruslog.Global), - })) - if err != nil { - log.Fatalln("unable to create Temporal client", err) - } - defer c.Close() - // This worker hosts both Workflow and Activity functions - w := worker.New(c, TASK_QUEUE, worker.Options{}) - w.RegisterWorkflow(app.DevLakePipelineWorkflow) - w.RegisterActivity(app.DevLakeTaskActivity) - // Start listening to the Task Queue - err = errors.Convert(w.Run(worker.InterruptCh())) - if err != nil { - log.Fatalln("unable to start Worker", err) - } -}
