This is an automated email from the ASF dual-hosted git repository.
ka94 pushed a commit to branch issues/5939
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/issues/5939 by this push:
new 6d926bf33 refactor: removed temporal support
6d926bf33 is described below
commit 6d926bf33d975f73e0eaab471c864b1702739e19
Author: Keon Amini <[email protected]>
AuthorDate: Thu Sep 28 16:52:36 2023 -0500
refactor: removed temporal support
---
Makefile | 8 +-
backend/Makefile | 12 +--
backend/core/config/config_viper.go | 1 -
backend/go.mod | 2 -
backend/server/services/pipeline.go | 164 +++++------------------------
backend/server/services/pipeline_runner.go | 40 +------
backend/worker/app/logger.go | 66 ------------
backend/worker/app/pipeline_workflow.go | 82 ---------------
backend/worker/app/shared.go | 63 -----------
backend/worker/app/task_activity.go | 53 ----------
backend/worker/main.go | 58 ----------
11 files changed, 27 insertions(+), 522 deletions(-)
diff --git a/Makefile b/Makefile
index 1ce6c7027..166b1ae17 100644
--- a/Makefile
+++ b/Makefile
@@ -71,15 +71,12 @@ swag:
build-plugin:
make build-plugin -C backend
-build-worker:
- make build-worker -C backend
-
build-server:
make build-server -C backend
build: build-plugin build-server
-all: build build-worker
+all: build
tap-models:
make tap-models -C backend
@@ -87,9 +84,6 @@ tap-models:
run:
make run -C backend
-worker:
- make worker -C backend
-
dev:
make dev -C backend
diff --git a/backend/Makefile b/backend/Makefile
index b86c95348..48dc38725 100644
--- a/backend/Makefile
+++ b/backend/Makefile
@@ -57,13 +57,6 @@ build-plugin:
PLUGIN=$(PLUGIN) sh scripts/compile-plugins.sh; \
fi
-build-worker:
- if [ "$(DEBUG)" = "true" ]; then \
- go build -gcflags='all=-N -l' -ldflags "-X
'github.com/apache/incubator-devlake/core/version.Version=$(VERSION)'" -o
bin/lake-worker ./worker/; \
- else \
- go build -ldflags "-X
'github.com/apache/incubator-devlake/core/version.Version=$(VERSION)'" -o
bin/lake-worker ./worker/; \
- fi
-
build-server: swag
if [ "$(DEBUG)" = "true" ]; then \
go build -gcflags='all=-N -l' -ldflags "-X
'github.com/apache/incubator-devlake/core/version.Version=$(VERSION)'" -o
bin/lake ./server/; \
@@ -78,14 +71,11 @@ build-python: #don't mix this with the other build commands
build: build-plugin build-server
-all: build build-worker
+all: build
run:
go run server/main.go
-worker:
- go run worker/*.go
-
dev: build-plugin build-python run
godev:
diff --git a/backend/core/config/config_viper.go
b/backend/core/config/config_viper.go
index 80d1dd85d..0937f2237 100644
--- a/backend/core/config/config_viper.go
+++ b/backend/core/config/config_viper.go
@@ -102,7 +102,6 @@ func getEnvPath() string {
func setDefaultValue(v *viper.Viper) {
v.SetDefault("PORT", "8080")
v.SetDefault("PLUGIN_DIR", "bin/plugins")
- v.SetDefault("TEMPORAL_TASK_QUEUE", "DEVLAKE_TASK_QUEUE")
v.SetDefault("REMOTE_PLUGIN_DIR", "python/plugins")
v.SetDefault("SWAGGER_DOCS_DIR", "resources/swagger")
}
diff --git a/backend/go.mod b/backend/go.mod
index 46e741447..2f692e35a 100644
--- a/backend/go.mod
+++ b/backend/go.mod
@@ -30,8 +30,6 @@ require (
github.com/swaggo/swag v1.16.1
github.com/tidwall/gjson v1.14.3
github.com/viant/afs v1.16.0
- go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a
- go.temporal.io/sdk v1.14.0
golang.org/x/crypto v0.9.0
golang.org/x/exp v0.0.0-20221028150844-83b7d23a625f
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602
diff --git a/backend/server/services/pipeline.go
b/backend/server/services/pipeline.go
index 8e72901ed..97e6d06e0 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -33,14 +33,10 @@ import (
"github.com/apache/incubator-devlake/helpers/dbhelper"
"github.com/apache/incubator-devlake/impls/logruslog"
"github.com/google/uuid"
- v11 "go.temporal.io/api/enums/v1"
- "go.temporal.io/sdk/client"
- "go.temporal.io/sdk/converter"
"golang.org/x/sync/semaphore"
)
var notificationService *NotificationService
-var temporalClient client.Client
var globalPipelineLog = logruslog.Global.Nested("pipeline service")
// PipelineQuery is a query for GetPipelines
@@ -60,46 +56,32 @@ func pipelineServiceInit() {
notificationService =
NewNotificationService(notificationEndpoint, notificationSecret)
}
- // temporal client
- var temporalUrl = cfg.GetString("TEMPORAL_URL")
- if temporalUrl != "" {
- // TODO: logger
- var err error
- temporalClient, err = client.NewClient(client.Options{
- HostPort: temporalUrl,
- })
- if err != nil {
- panic(err)
- }
- watchTemporalPipelines()
- } else {
- // standalone mode: reset pipeline status
- errMsg := "The process was terminated unexpectedly"
- err := db.UpdateColumns(
- &models.Pipeline{},
- []dal.DalSet{
- {ColumnName: "status", Value:
models.TASK_FAILED},
- {ColumnName: "message", Value: errMsg},
- },
- dal.Where("status = ?", models.TASK_RUNNING),
- )
- if err != nil {
- panic(err)
- }
- err = db.UpdateColumns(
- &models.Task{},
- []dal.DalSet{
- {ColumnName: "status", Value:
models.TASK_FAILED},
- {ColumnName: "message", Value: errMsg},
- },
- dal.Where("status = ?", models.TASK_RUNNING),
- )
- if err != nil {
- panic(err)
- }
+ // standalone mode: reset pipeline status
+ errMsg := "The process was terminated unexpectedly"
+ err := db.UpdateColumns(
+ &models.Pipeline{},
+ []dal.DalSet{
+ {ColumnName: "status", Value: models.TASK_FAILED},
+ {ColumnName: "message", Value: errMsg},
+ },
+ dal.Where("status = ?", models.TASK_RUNNING),
+ )
+ if err != nil {
+ panic(err)
+ }
+ err = db.UpdateColumns(
+ &models.Task{},
+ []dal.DalSet{
+ {ColumnName: "status", Value: models.TASK_FAILED},
+ {ColumnName: "message", Value: errMsg},
+ },
+ dal.Where("status = ?", models.TASK_RUNNING),
+ )
+ if err != nil {
+ panic(err)
}
- err := ReloadBlueprints(cronManager)
+ err = ReloadBlueprints(cronManager)
if err != nil {
panic(err)
}
@@ -268,101 +250,6 @@ func RunPipelineInQueue(pipelineMaxParallel int64) {
}
}
-func watchTemporalPipelines() {
- ticker := time.NewTicker(3 * time.Second)
- dc := converter.GetDefaultDataConverter()
- go func() {
- // run forever
- for range ticker.C {
- // load all running pipeline from database
- runningDbPipelines := make([]models.Pipeline, 0)
- err := db.All(&runningDbPipelines, dal.Where("status =
?", models.TASK_RUNNING))
- if err != nil {
- panic(err)
- }
- // progressDetails will be only used in this goroutine
now
- // So it needn't lock and unlock now
- progressDetails :=
make(map[uint64]*models.TaskProgressDetail)
- // check their status against temporal
- for _, rp := range runningDbPipelines {
- workflowId := getTemporalWorkflowId(rp.ID)
- desc, err :=
temporalClient.DescribeWorkflowExecution(
- context.Background(),
- workflowId,
- "",
- )
- if err != nil {
- globalPipelineLog.Error(err, "failed to
query workflow execution: %v", err)
- continue
- }
- // workflow is terminated by outsider
- s := desc.WorkflowExecutionInfo.Status
- if s != v11.WORKFLOW_EXECUTION_STATUS_RUNNING {
- rp.Status = models.TASK_COMPLETED
- if s !=
v11.WORKFLOW_EXECUTION_STATUS_COMPLETED {
- rp.Status = models.TASK_FAILED
- // get error message
- hisIter :=
temporalClient.GetWorkflowHistory(
- context.Background(),
- workflowId,
- "",
- false,
-
v11.HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
- )
- for hisIter.HasNext() {
- his, err :=
hisIter.Next()
- if err != nil {
-
globalPipelineLog.Error(err, "failed to get next from workflow history
iterator: %v", err)
- continue
- }
- rp.Message =
fmt.Sprintf("temporal event type: %v", his.GetEventType())
- }
- }
- rp.FinishedAt =
desc.WorkflowExecutionInfo.CloseTime
- err = db.UpdateColumns(rp, []dal.DalSet{
- {ColumnName: "status", Value:
rp.Status},
- {ColumnName: "message", Value:
rp.Message},
- {ColumnName: "finished_at",
Value: rp.FinishedAt},
- })
- if err != nil {
- globalPipelineLog.Error(err,
"failed to update db: %v", err)
- }
- continue
- }
-
- // check pending activity
- for _, activity := range desc.PendingActivities
{
- taskId, err :=
getTaskIdFromActivityId(activity.ActivityId)
- if err != nil {
- globalPipelineLog.Error(err,
"unable to extract task id from activity id `%s`", activity.ActivityId)
- continue
- }
- progressDetail :=
&models.TaskProgressDetail{}
- progressDetails[taskId] = progressDetail
- heartbeats :=
activity.GetHeartbeatDetails()
- if heartbeats == nil {
- continue
- }
- payloads := heartbeats.GetPayloads()
- if len(payloads) == 0 {
- return
- }
- lastPayload := payloads[len(payloads)-1]
- if err := dc.FromPayload(lastPayload,
progressDetail); err != nil {
- globalPipelineLog.Error(err,
"failed to unmarshal heartbeat payload: %v", err)
- continue
- }
- }
- }
- runningTasks.setAll(progressDetails)
- }
- }()
-}
-
-func getTemporalWorkflowId(pipelineId uint64) string {
- return fmt.Sprintf("pipeline #%d", pipelineId)
-}
-
// NotifyExternal FIXME ...
func NotifyExternal(pipelineId uint64) errors.Error {
if notificationService == nil {
@@ -415,9 +302,6 @@ func CancelPipeline(pipelineId uint64) errors.Error {
// the target pipeline is pending, no running, no need to
perform the actual cancel operation
return nil
}
- if temporalClient != nil {
- return
errors.Convert(temporalClient.CancelWorkflow(context.Background(),
getTemporalWorkflowId(pipelineId), ""))
- }
pendingTasks, count, err := GetTasks(&TaskQuery{PipelineId: pipelineId,
Pending: 1, Pagination: Pagination{PageSize: -1}})
if err != nil {
return errors.Convert(err)
diff --git a/backend/server/services/pipeline_runner.go
b/backend/server/services/pipeline_runner.go
index cb3d27f4a..c11390315 100644
--- a/backend/server/services/pipeline_runner.go
+++ b/backend/server/services/pipeline_runner.go
@@ -19,7 +19,6 @@ package services
import (
"context"
- "encoding/json"
"fmt"
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
@@ -27,8 +26,6 @@ import (
"github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/runner"
"github.com/apache/incubator-devlake/impls/logruslog"
- "github.com/apache/incubator-devlake/worker/app"
- "go.temporal.io/sdk/client"
"time"
)
@@ -47,37 +44,6 @@ func (p *pipelineRunner) runPipelineStandalone()
errors.Error {
)
}
-func (p *pipelineRunner) runPipelineViaTemporal() errors.Error {
- workflowOpts := client.StartWorkflowOptions{
- ID: getTemporalWorkflowId(p.pipeline.ID),
- TaskQueue: cfg.GetString("TEMPORAL_TASK_QUEUE"),
- }
- // send only the very basis data
- configJson, err := json.Marshal(cfg.AllSettings())
- if err != nil {
- return errors.Convert(err)
- }
- p.logger.Info("enqueue pipeline #%d into temporal task queue",
p.pipeline.ID)
- workflow, err := temporalClient.ExecuteWorkflow(
- context.Background(),
- workflowOpts,
- app.DevLakePipelineWorkflow,
- configJson,
- p.pipeline.ID,
- p.logger.GetConfig(),
- )
- if err != nil {
- p.logger.Error(err, "failed to enqueue pipeline #%d into
temporal", p.pipeline.ID)
- return errors.Convert(err)
- }
- err = workflow.Get(context.Background(), nil)
- if err != nil {
- p.logger.Info("failed to execute pipeline #%d via temporal:
%v", p.pipeline.ID, err)
- }
- p.logger.Info("pipeline #%d finished by temporal", p.pipeline.ID)
- return errors.Convert(err)
-}
-
// GetPipelineLogger returns logger for the pipeline
func GetPipelineLogger(pipeline *models.Pipeline) log.Logger {
pipelineLogger := globalPipelineLog.Nested(
@@ -107,11 +73,7 @@ func runPipeline(pipelineId uint64) errors.Error {
pipeline: ppl,
}
// run
- if temporalClient != nil {
- err = pipelineRun.runPipelineViaTemporal()
- } else {
- err = pipelineRun.runPipelineStandalone()
- }
+ err = pipelineRun.runPipelineStandalone()
isCancelled := errors.Is(err, context.Canceled)
if err != nil {
err = errors.Default.Wrap(err, fmt.Sprintf("Error running
pipeline %d.", pipelineId))
diff --git a/backend/worker/app/logger.go b/backend/worker/app/logger.go
deleted file mode 100644
index 7e1ca7c04..000000000
--- a/backend/worker/app/logger.go
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package app
-
-import (
- "fmt"
- core "github.com/apache/incubator-devlake/core/log"
- "go.temporal.io/sdk/log"
-)
-
-// TemporalLogger FIXME ...
-type TemporalLogger struct {
- log core.Logger
-}
-
-// NewTemporalLogger FIXME ...
-func NewTemporalLogger(coreLogger core.Logger) log.Logger {
- return &TemporalLogger{
- coreLogger,
- }
-}
-
-// Log FIXME ...
-func (l *TemporalLogger) Log(lv core.LogLevel, msg string, keyvals
...interface{}) {
- if l.log.IsLevelEnabled(lv) {
- for i := 0; i < len(keyvals); i += 2 {
- msg += fmt.Sprintf(" %s %v", keyvals[i], keyvals[i+1])
- }
- l.log.Log(lv, msg)
- }
-}
-
-// Debug FIXME ...
-func (l *TemporalLogger) Debug(msg string, keyvals ...interface{}) {
- l.Log(core.LOG_DEBUG, msg, keyvals...)
-}
-
-// Info FIXME ...
-func (l *TemporalLogger) Info(msg string, keyvals ...interface{}) {
- l.Log(core.LOG_INFO, msg, keyvals...)
-}
-
-// Warn FIXME ...
-func (l *TemporalLogger) Warn(msg string, keyvals ...interface{}) {
- l.Log(core.LOG_WARN, msg, keyvals...)
-}
-
-// Error FIXME ...
-func (l *TemporalLogger) Error(msg string, keyvals ...interface{}) {
- l.Log(core.LOG_ERROR, msg, keyvals...)
-}
diff --git a/backend/worker/app/pipeline_workflow.go
b/backend/worker/app/pipeline_workflow.go
deleted file mode 100644
index de6350b54..000000000
--- a/backend/worker/app/pipeline_workflow.go
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package app
-
-import (
- "fmt"
- "github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/log"
- "github.com/apache/incubator-devlake/core/runner"
- "strings"
- "time"
-
- "go.temporal.io/sdk/workflow"
-)
-
-// DevLakePipelineWorkflow FIXME ...
-func DevLakePipelineWorkflow(ctx workflow.Context, configJson []byte,
pipelineId uint64, loggerConfig *log.LoggerConfig) errors.Error {
- basicRes, err := loadResources(configJson, loggerConfig)
- if err != nil {
- return errors.Convert(err)
- }
- logger := basicRes.GetLogger()
- logger.Info("received pipeline #%d", pipelineId)
- err = runner.RunPipeline(
- basicRes,
- pipelineId,
- func(taskIds []uint64) errors.Error {
- return runTasks(ctx, configJson, taskIds, logger)
- },
- )
- if err != nil {
- logger.Error(err, "failed to execute pipeline #%d", pipelineId)
- }
- logger.Info("finished pipeline #%d", pipelineId)
- return err
-}
-
-func runTasks(ctx workflow.Context, configJson []byte, taskIds []uint64,
logger log.Logger) errors.Error {
- cleanExit := false
- defer func() {
- if !cleanExit {
- logger.Error(nil, "fatal error while executing task
Ids: %v", taskIds)
- }
- }()
- futures := make([]workflow.Future, len(taskIds))
- for i, taskId := range taskIds {
- activityOpts := workflow.ActivityOptions{
- ActivityID: fmt.Sprintf("task #%d", taskId),
- StartToCloseTimeout: 24 * time.Hour,
- WaitForCancellation: true,
- }
- activityCtx := workflow.WithActivityOptions(ctx, activityOpts)
- futures[i] = workflow.ExecuteActivity(activityCtx,
DevLakeTaskActivity, configJson, taskId, logger)
- }
- errs := make([]string, 0)
- for _, future := range futures {
- err := future.Get(ctx, nil)
- if err != nil {
- errs = append(errs, err.Error())
- }
- }
- cleanExit = true
- if len(errs) > 0 {
- return errors.Default.New(strings.Join(errs, "\n"))
- }
- return nil
-}
diff --git a/backend/worker/app/shared.go b/backend/worker/app/shared.go
deleted file mode 100644
index 6b52e7801..000000000
--- a/backend/worker/app/shared.go
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package app
-
-import (
- "bytes"
- "github.com/apache/incubator-devlake/core/context"
- "github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/log"
- "github.com/apache/incubator-devlake/core/runner"
- "github.com/apache/incubator-devlake/impls/logruslog"
- "github.com/spf13/viper"
-)
-
-func loadResources(configJson []byte, loggerConfig *log.LoggerConfig)
(context.BasicRes, errors.Error) {
- // TODO: should be redirected to server
- globalLogger := logruslog.Global.Nested("worker")
- // prepare
- cfg := viper.New()
- cfg.SetConfigType("json")
- err := cfg.ReadConfig(bytes.NewBuffer(configJson))
- if err != nil {
- globalLogger.Error(err, "failed to load resources")
- return nil, errors.Convert(err)
- }
- db, err := runner.NewGormDb(cfg, globalLogger)
- if err != nil {
- return nil, errors.Convert(err)
- }
- logger, err := getWorkerLogger(globalLogger, loggerConfig)
- if err != nil {
- return nil, errors.Convert(err)
- }
- return runner.CreateBasicRes(cfg, logger, db), nil
-}
-
-func getWorkerLogger(logger log.Logger, logConfig *log.LoggerConfig)
(log.Logger, errors.Error) {
- newLogger := logger.Nested(logConfig.Prefix)
- stream, err := logruslog.GetFileStream(logConfig.Path)
- if err != nil {
- return nil, err
- }
- newLogger.SetStream(&log.LoggerStreamConfig{
- Path: logConfig.Path,
- Writer: stream,
- })
- return newLogger, nil
-}
diff --git a/backend/worker/app/task_activity.go
b/backend/worker/app/task_activity.go
deleted file mode 100644
index 87fea1ae4..000000000
--- a/backend/worker/app/task_activity.go
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package app
-
-import (
- "context"
- "github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/log"
- "github.com/apache/incubator-devlake/core/models"
- "github.com/apache/incubator-devlake/core/plugin"
- "github.com/apache/incubator-devlake/core/runner"
- "go.temporal.io/sdk/activity"
-)
-
-// DevLakeTaskActivity FIXME ...
-func DevLakeTaskActivity(ctx context.Context, configJson []byte, taskId
uint64, loggerConfig *log.LoggerConfig) errors.Error {
- basicRes, err := loadResources(configJson, loggerConfig)
- if err != nil {
- return err
- }
- logger := basicRes.GetLogger()
- logger.Info("received task #%d", taskId)
- progressDetail := &models.TaskProgressDetail{}
- progChan := make(chan plugin.RunningProgress)
- defer close(progChan)
- go func() {
- for p := range progChan {
- runner.UpdateProgressDetail(basicRes, taskId,
progressDetail, &p)
- activity.RecordHeartbeat(ctx, progressDetail)
- }
- }()
- err = runner.RunTask(ctx, basicRes, progChan, taskId)
- if err != nil {
- logger.Error(err, "failed to execute task #%d", taskId)
- }
- logger.Info("finished task #%d", taskId)
- return err
-}
diff --git a/backend/worker/main.go b/backend/worker/main.go
deleted file mode 100644
index 58a008153..000000000
--- a/backend/worker/main.go
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package main
-
-import (
- "github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/runner"
- _ "github.com/apache/incubator-devlake/core/version"
- "github.com/apache/incubator-devlake/impls/logruslog"
- "github.com/apache/incubator-devlake/worker/app"
- "go.temporal.io/sdk/client"
- "go.temporal.io/sdk/worker"
- "log"
-)
-
-func main() {
- basicRes := runner.CreateAppBasicRes()
- err := runner.LoadPlugins(basicRes)
- if err != nil {
- panic(err)
- }
-
- // establish temporal connection
- TASK_QUEUE := basicRes.GetConfig("TEMPORAL_TASK_QUEUE")
- // Create the client object just once per process
- c, err := errors.Convert01(client.NewClient(client.Options{
- HostPort: basicRes.GetConfig("TEMPORAL_URL"),
- Logger: app.NewTemporalLogger(logruslog.Global),
- }))
- if err != nil {
- log.Fatalln("unable to create Temporal client", err)
- }
- defer c.Close()
- // This worker hosts both Workflow and Activity functions
- w := worker.New(c, TASK_QUEUE, worker.Options{})
- w.RegisterWorkflow(app.DevLakePipelineWorkflow)
- w.RegisterActivity(app.DevLakeTaskActivity)
- // Start listening to the Task Queue
- err = errors.Convert(w.Run(worker.InterruptCh()))
- if err != nil {
- log.Fatalln("unable to start Worker", err)
- }
-}