This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch go-sdk-pushmi in repository https://gitbox.apache.org/repos/asf/airflow.git
commit bdede21f2a0e0ed4a13757d5c0851d41a46bb12b Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Tue Sep 30 17:49:15 2025 +0100 Simple sending of return value to XCom This right now is very simple and will need to evolve over time -- it doens't support XCom backends for instance --- go-sdk/bundle/bundlev1/registry.go | 1 - go-sdk/bundle/bundlev1/task.go | 24 +- go-sdk/example/bundle/main.go | 12 +- go-sdk/pkg/api/client.gen.go | 1203 ++++++++++++++++++++++-------------- go-sdk/pkg/api/models.go | 2 + go-sdk/pkg/edgeapi/client.gen.go | 274 ++++---- go-sdk/pkg/worker/runner.go | 4 + go-sdk/pkg/worker/task.go | 158 ----- go-sdk/pkg/worker/task_test.go | 119 ---- go-sdk/sdk/client.go | 56 +- go-sdk/sdk/errors.go | 2 + go-sdk/sdk/sdk.go | 21 + 12 files changed, 1006 insertions(+), 870 deletions(-) diff --git a/go-sdk/bundle/bundlev1/registry.go b/go-sdk/bundle/bundlev1/registry.go index 494983f66d9..8d902efa081 100644 --- a/go-sdk/bundle/bundlev1/registry.go +++ b/go-sdk/bundle/bundlev1/registry.go @@ -28,7 +28,6 @@ import ( ) type ( - // task is an interface of an task implementation. Task = worker.Task Bundle = worker.Bundle diff --git a/go-sdk/bundle/bundlev1/task.go b/go-sdk/bundle/bundlev1/task.go index 1486d4e474a..af2b5529bbf 100644 --- a/go-sdk/bundle/bundlev1/task.go +++ b/go-sdk/bundle/bundlev1/task.go @@ -24,6 +24,8 @@ import ( "reflect" "runtime" + "github.com/apache/airflow/go-sdk/pkg/api" + "github.com/apache/airflow/go-sdk/pkg/sdkcontext" "github.com/apache/airflow/go-sdk/sdk" ) @@ -43,6 +45,7 @@ func NewTaskFunction(fn any) (Task, error) { func (f *taskFunction) Execute(ctx context.Context, logger *slog.Logger) error { fnType := f.fn.Type() + sdkClient := sdk.NewClient() reflectArgs := make([]reflect.Value, fnType.NumIn()) for i := range reflectArgs { @@ -54,7 +57,7 @@ func (f *taskFunction) Execute(ctx context.Context, logger *slog.Logger) error { case isLogger(in): reflectArgs[i] = reflect.ValueOf(logger) case isClient(in): - reflectArgs[i] = reflect.ValueOf(sdk.NewClient()) + reflectArgs[i] = reflect.ValueOf(sdkClient) default: // TODO: deal with other value types. For now they will all be Zero values unless it's a context reflectArgs[i] = reflect.Zero(in) @@ -74,15 +77,26 @@ func (f *taskFunction) Execute(ctx context.Context, logger *slog.Logger) error { } } // If there are two results, convert the first only if it's not a nil pointer - var res any if len(retValues) > 1 && (retValues[0].Kind() != reflect.Ptr || !retValues[0].IsNil()) { - res = retValues[0].Interface() + res := retValues[0].Interface() + f.sendXcom(ctx, res, sdkClient, logger) } - // TODO: send the result to XCom - _ = res return err } +func (f *taskFunction) sendXcom( + ctx context.Context, + value any, + c sdk.XComClient, + logger *slog.Logger, +) { + workload := ctx.Value(sdkcontext.WorkloadContextKey).(api.ExecuteTaskWorkload) + err := c.PutXComForTi(ctx, workload.TI, api.XComReturnValueKey, value) + if err != nil { + logger.ErrorContext(ctx, "Unable to set XCom", "err", err) + } +} + func (f *taskFunction) validateFn(fnType reflect.Type) error { if fnType.Kind() != reflect.Func { return fmt.Errorf("expected a func as input but was %s", fnType.Kind()) diff --git a/go-sdk/example/bundle/main.go b/go-sdk/example/bundle/main.go index f55794416d1..a53f7756382 100644 --- a/go-sdk/example/bundle/main.go +++ b/go-sdk/example/bundle/main.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "log/slog" + "runtime" "time" v1 "github.com/apache/airflow/go-sdk/bundle/bundlev1" @@ -56,7 +57,7 @@ func main() { bundlev1server.Serve(&myBundle{}) } -func extract(ctx context.Context, client sdk.Client, log *slog.Logger) error { +func extract(ctx context.Context, client sdk.Client, log *slog.Logger) (any, error) { log.Info("Hello from task") conn, err := client.GetConnection(ctx, "test_http") if err != nil { @@ -69,7 +70,7 @@ func extract(ctx context.Context, client sdk.Client, log *slog.Logger) error { // Once per loop,.check if we've been asked to cancel! select { case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() default: } log.Info("After the beep the time will be", "time", time.Now()) @@ -77,7 +78,12 @@ func extract(ctx context.Context, client sdk.Client, log *slog.Logger) error { } log.Info("Goodbye from task") - return nil + ret := map[string]any{ + "go_version": runtime.Version(), + } + + runtime.Breakpoint() + return ret, nil } func transform(ctx context.Context, client sdk.VariableClient, log *slog.Logger) error { diff --git a/go-sdk/pkg/api/client.gen.go b/go-sdk/pkg/api/client.gen.go index 559c9740339..3f97c93691a 100644 --- a/go-sdk/pkg/api/client.gen.go +++ b/go-sdk/pkg/api/client.gen.go @@ -1,6 +1,6 @@ // Package api provides primitives to interact with the openapi HTTP API. // -// Code generated by github.com/ashb/oapi-resty-codegen version v0.0.0-20250813054149-69b488b842b5 DO NOT EDIT. +// Code generated by github.com/ashb/oapi-resty-codegen version v0.0.0-20250930162853-bec2fc27e468 DO NOT EDIT. // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -25,6 +25,7 @@ import ( "encoding/json" "fmt" "net/http" + "net/url" "strings" "time" @@ -1506,7 +1507,6 @@ func HandleError(client *resty.Client, resp *resty.Response) error { e := GeneralHTTPError{Response: resp} - // fmt.Printf("HandleError: IsRead: %v Error: %T\n", resp.IsRead, resp.Error()) e.Text = resp.String() if resp.Header().Get("content-type") == "application/json" { if json.Unmarshal([]byte(e.Text), &e.JSON) == nil { @@ -1526,26 +1526,43 @@ type assetEventsClient struct { // GetByAssetNameUriResponse performs the HTTP request and returns the lower level [resty.Response] func (c *assetEventsClient) GetByAssetNameUriResponse(ctx context.Context, params *GetAssetEventByAssetNameUriParams) (resp *resty.Response, err error) { + + if params == nil { + return nil, fmt.Errorf("GetByAssetNameUri requires a non-nil params argument") + } var res AssetEventsResponse - var param0 string + req := c.R().SetContext(ctx).SetResult(&res) + + { + + exploded, err := runtime.StyleParamWithLocation("form", true, "name", runtime.ParamLocationQuery, params.Name) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "name", runtime.ParamLocationQuery, params.Name) - if err != nil { - return nil, err } - var param1 string - param1, err = runtime.StyleParamWithLocation("form", true, "uri", runtime.ParamLocationQuery, params.Uri) - if err != nil { - return nil, err + { + + exploded, err := runtime.StyleParamWithLocation("form", true, "uri", runtime.ParamLocationQuery, params.Uri) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - resp, err = c.R(). - SetContext(ctx). - SetQueryParam("name", param0). - SetQueryParam("uri", param1). - SetResult(&res). + resp, err = req. Get("asset-events/by-asset") if err != nil { return resp, err @@ -1564,19 +1581,29 @@ func (c *assetEventsClient) GetByAssetNameUri(ctx context.Context, params *GetAs // GetByAssetAliasResponse performs the HTTP request and returns the lower level [resty.Response] func (c *assetEventsClient) GetByAssetAliasResponse(ctx context.Context, params *GetAssetEventByAssetAliasParams) (resp *resty.Response, err error) { + + if params == nil { + return nil, fmt.Errorf("GetByAssetAlias requires a non-nil params argument") + } var res AssetEventsResponse - var param0 string + req := c.R().SetContext(ctx).SetResult(&res) + + { + + exploded, err := runtime.StyleParamWithLocation("form", true, "name", runtime.ParamLocationQuery, params.Name) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "name", runtime.ParamLocationQuery, params.Name) - if err != nil { - return nil, err } - resp, err = c.R(). - SetContext(ctx). - SetQueryParam("name", param0). - SetResult(&res). + resp, err = req. Get("asset-events/by-asset-alias") if err != nil { return resp, err @@ -1599,19 +1626,29 @@ type assetsClient struct { // GetByNameResponse performs the HTTP request and returns the lower level [resty.Response] func (c *assetsClient) GetByNameResponse(ctx context.Context, params *GetAssetByNameParams) (resp *resty.Response, err error) { + + if params == nil { + return nil, fmt.Errorf("GetByName requires a non-nil params argument") + } var res AssetResponse - var param0 string + req := c.R().SetContext(ctx).SetResult(&res) + + { + + exploded, err := runtime.StyleParamWithLocation("form", true, "name", runtime.ParamLocationQuery, params.Name) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "name", runtime.ParamLocationQuery, params.Name) - if err != nil { - return nil, err } - resp, err = c.R(). - SetContext(ctx). - SetQueryParam("name", param0). - SetResult(&res). + resp, err = req. Get("assets/by-name") if err != nil { return resp, err @@ -1630,19 +1667,29 @@ func (c *assetsClient) GetByName(ctx context.Context, params *GetAssetByNamePara // GetByUriResponse performs the HTTP request and returns the lower level [resty.Response] func (c *assetsClient) GetByUriResponse(ctx context.Context, params *GetAssetByUriParams) (resp *resty.Response, err error) { + + if params == nil { + return nil, fmt.Errorf("GetByUri requires a non-nil params argument") + } var res AssetResponse - var param0 string + req := c.R().SetContext(ctx).SetResult(&res) + + { + + exploded, err := runtime.StyleParamWithLocation("form", true, "uri", runtime.ParamLocationQuery, params.Uri) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "uri", runtime.ParamLocationQuery, params.Uri) - if err != nil { - return nil, err } - resp, err = c.R(). - SetContext(ctx). - SetQueryParam("uri", param0). - SetResult(&res). + resp, err = req. Get("assets/by-uri") if err != nil { return resp, err @@ -1665,19 +1712,20 @@ type connectionsClient struct { // GetResponse performs the HTTP request and returns the lower level [resty.Response] func (c *connectionsClient) GetResponse(ctx context.Context, connectionId string) (resp *resty.Response, err error) { + var res ConnectionResponse - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "connection_id", runtime.ParamLocationPath, connectionId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "connection_id", runtime.ParamLocationPath, connectionId) + if err != nil { + return nil, err + } + req.SetPathParam("connection_id", pathParam) } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("connection_id", pathParam0). - SetResult(&res). + resp, err = req. Get("connections/{connection_id}") if err != nil { return resp, err @@ -1700,40 +1748,71 @@ type dagRunsClient struct { // GetDrCountResponse performs the HTTP request and returns the lower level [resty.Response] func (c *dagRunsClient) GetDrCountResponse(ctx context.Context, params *GetDrCountParams) (resp *resty.Response, err error) { + + if params == nil { + return nil, fmt.Errorf("GetDrCount requires a non-nil params argument") + } var res int - var param0 string + req := c.R().SetContext(ctx).SetResult(&res) + + { + + exploded, err := runtime.StyleParamWithLocation("form", true, "dag_id", runtime.ParamLocationQuery, params.DagId) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "dag_id", runtime.ParamLocationQuery, params.DagId) - if err != nil { - return nil, err } - var param1 string - param1, err = runtime.StyleParamWithLocation("form", true, "logical_dates", runtime.ParamLocationQuery, params.LogicalDates) - if err != nil { - return nil, err + if params.LogicalDates != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "logical_dates", runtime.ParamLocationQuery, params.LogicalDates) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param2 string - param2, err = runtime.StyleParamWithLocation("form", true, "run_ids", runtime.ParamLocationQuery, params.RunIds) - if err != nil { - return nil, err + if params.RunIds != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "run_ids", runtime.ParamLocationQuery, params.RunIds) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param3 string - param3, err = runtime.StyleParamWithLocation("form", true, "states", runtime.ParamLocationQuery, params.States) - if err != nil { - return nil, err + if params.States != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "states", runtime.ParamLocationQuery, params.States) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - resp, err = c.R(). - SetContext(ctx). - SetQueryParam("dag_id", param0). - SetQueryParam("logical_dates", param1). - SetQueryParam("run_ids", param2). - SetQueryParam("states", param3). - SetResult(&res). + resp, err = req. Get("dag-runs/count") if err != nil { return resp, err @@ -1753,26 +1832,29 @@ func (c *dagRunsClient) GetDrCount(ctx context.Context, params *GetDrCountParams // TriggerResponse performs the HTTP request and returns the lower level [resty.Response] func (c *dagRunsClient) TriggerResponse(ctx context.Context, dagId string, runId string, body *TriggerDAGRunPayload) (resp *resty.Response, err error) { - var pathParam0 string + req := c.R().SetContext(ctx) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } if body == nil { return nil, fmt.Errorf("Trigger requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("run_id", pathParam1). + resp, err = req. + SetContentType("application/json"). SetBody(body). Post("dag-runs/{dag_id}/{run_id}") if err != nil { @@ -1789,23 +1871,25 @@ func (c *dagRunsClient) Trigger(ctx context.Context, dagId string, runId string, // ClearResponse performs the HTTP request and returns the lower level [resty.Response] func (c *dagRunsClient) ClearResponse(ctx context.Context, dagId string, runId string) (resp *resty.Response, err error) { - var pathParam0 string + req := c.R().SetContext(ctx) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("run_id", pathParam1). + resp, err = req. Post("dag-runs/{dag_id}/{run_id}/clear") if err != nil { return resp, err @@ -1820,26 +1904,28 @@ func (c *dagRunsClient) Clear(ctx context.Context, dagId string, runId string) e // GetStateResponse performs the HTTP request and returns the lower level [resty.Response] func (c *dagRunsClient) GetStateResponse(ctx context.Context, dagId string, runId string) (resp *resty.Response, err error) { + var res DagRunStateResponse - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("run_id", pathParam1). - SetResult(&res). + resp, err = req. Get("dag-runs/{dag_id}/{run_id}/state") if err != nil { return resp, err @@ -1862,61 +1948,113 @@ type taskInstancesClient struct { // GetCountResponse performs the HTTP request and returns the lower level [resty.Response] func (c *taskInstancesClient) GetCountResponse(ctx context.Context, params *GetTaskInstanceCountParams) (resp *resty.Response, err error) { + + if params == nil { + return nil, fmt.Errorf("GetCount requires a non-nil params argument") + } var res int - var param0 string + req := c.R().SetContext(ctx).SetResult(&res) + + { + + exploded, err := runtime.StyleParamWithLocation("form", true, "dag_id", runtime.ParamLocationQuery, params.DagId) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "dag_id", runtime.ParamLocationQuery, params.DagId) - if err != nil { - return nil, err } - var param1 string - param1, err = runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) - if err != nil { - return nil, err + if params.MapIndex != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param2 string - param2, err = runtime.StyleParamWithLocation("form", true, "task_ids", runtime.ParamLocationQuery, params.TaskIds) - if err != nil { - return nil, err + if params.TaskIds != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "task_ids", runtime.ParamLocationQuery, params.TaskIds) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param3 string - param3, err = runtime.StyleParamWithLocation("form", true, "task_group_id", runtime.ParamLocationQuery, params.TaskGroupId) - if err != nil { - return nil, err + if params.TaskGroupId != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "task_group_id", runtime.ParamLocationQuery, params.TaskGroupId) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param4 string - param4, err = runtime.StyleParamWithLocation("form", true, "logical_dates", runtime.ParamLocationQuery, params.LogicalDates) - if err != nil { - return nil, err + if params.LogicalDates != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "logical_dates", runtime.ParamLocationQuery, params.LogicalDates) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param5 string - param5, err = runtime.StyleParamWithLocation("form", true, "run_ids", runtime.ParamLocationQuery, params.RunIds) - if err != nil { - return nil, err + if params.RunIds != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "run_ids", runtime.ParamLocationQuery, params.RunIds) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param6 string - param6, err = runtime.StyleParamWithLocation("form", true, "states", runtime.ParamLocationQuery, params.States) - if err != nil { - return nil, err + if params.States != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "states", runtime.ParamLocationQuery, params.States) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - resp, err = c.R(). - SetContext(ctx). - SetQueryParam("dag_id", param0). - SetQueryParam("map_index", param1). - SetQueryParam("task_ids", param2). - SetQueryParam("task_group_id", param3). - SetQueryParam("logical_dates", param4). - SetQueryParam("run_ids", param5). - SetQueryParam("states", param6). - SetResult(&res). + resp, err = req. Get("task-instances/count") if err != nil { return resp, err @@ -1935,54 +2073,99 @@ func (c *taskInstancesClient) GetCount(ctx context.Context, params *GetTaskInsta // GetStatesResponse performs the HTTP request and returns the lower level [resty.Response] func (c *taskInstancesClient) GetStatesResponse(ctx context.Context, params *GetTaskInstanceStatesParams) (resp *resty.Response, err error) { + + if params == nil { + return nil, fmt.Errorf("GetStates requires a non-nil params argument") + } var res TaskStatesResponse - var param0 string + req := c.R().SetContext(ctx).SetResult(&res) + + { + + exploded, err := runtime.StyleParamWithLocation("form", true, "dag_id", runtime.ParamLocationQuery, params.DagId) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "dag_id", runtime.ParamLocationQuery, params.DagId) - if err != nil { - return nil, err } - var param1 string - param1, err = runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) - if err != nil { - return nil, err + if params.MapIndex != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param2 string - param2, err = runtime.StyleParamWithLocation("form", true, "task_ids", runtime.ParamLocationQuery, params.TaskIds) - if err != nil { - return nil, err + if params.TaskIds != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "task_ids", runtime.ParamLocationQuery, params.TaskIds) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param3 string - param3, err = runtime.StyleParamWithLocation("form", true, "task_group_id", runtime.ParamLocationQuery, params.TaskGroupId) - if err != nil { - return nil, err + if params.TaskGroupId != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "task_group_id", runtime.ParamLocationQuery, params.TaskGroupId) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param4 string - param4, err = runtime.StyleParamWithLocation("form", true, "logical_dates", runtime.ParamLocationQuery, params.LogicalDates) - if err != nil { - return nil, err + if params.LogicalDates != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "logical_dates", runtime.ParamLocationQuery, params.LogicalDates) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param5 string - param5, err = runtime.StyleParamWithLocation("form", true, "run_ids", runtime.ParamLocationQuery, params.RunIds) - if err != nil { - return nil, err + if params.RunIds != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "run_ids", runtime.ParamLocationQuery, params.RunIds) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - resp, err = c.R(). - SetContext(ctx). - SetQueryParam("dag_id", param0). - SetQueryParam("map_index", param1). - SetQueryParam("task_ids", param2). - SetQueryParam("task_group_id", param3). - SetQueryParam("logical_dates", param4). - SetQueryParam("run_ids", param5). - SetResult(&res). + resp, err = req. Get("task-instances/states") if err != nil { return resp, err @@ -2002,19 +2185,21 @@ func (c *taskInstancesClient) GetStates(ctx context.Context, params *GetTaskInst // HeartbeatResponse performs the HTTP request and returns the lower level [resty.Response] func (c *taskInstancesClient) HeartbeatResponse(ctx context.Context, taskInstanceId openapi_types.UUID, body *TIHeartbeatInfo) (resp *resty.Response, err error) { - var pathParam0 string + req := c.R().SetContext(ctx) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) + if err != nil { + return nil, err + } + req.SetPathParam("task_instance_id", pathParam) } if body == nil { return nil, fmt.Errorf("Heartbeat requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("task_instance_id", pathParam0). + resp, err = req. + SetContentType("application/json"). SetBody(body). Put("task-instances/{task_instance_id}/heartbeat") if err != nil { @@ -2030,19 +2215,20 @@ func (c *taskInstancesClient) Heartbeat(ctx context.Context, taskInstanceId open // GetPreviousSuccessfulDagrunResponse performs the HTTP request and returns the lower level [resty.Response] func (c *taskInstancesClient) GetPreviousSuccessfulDagrunResponse(ctx context.Context, taskInstanceId openapi_types.UUID) (resp *resty.Response, err error) { + var res PrevSuccessfulDagRunResponse - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) + if err != nil { + return nil, err + } + req.SetPathParam("task_instance_id", pathParam) } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("task_instance_id", pathParam0). - SetResult(&res). + resp, err = req. Get("task-instances/{task_instance_id}/previous-successful-dagrun") if err != nil { return resp, err @@ -2061,23 +2247,25 @@ func (c *taskInstancesClient) GetPreviousSuccessfulDagrun(ctx context.Context, t // PutRenderedFieldsResponse performs the HTTP request and returns the lower level [resty.Response] func (c *taskInstancesClient) PutRenderedFieldsResponse(ctx context.Context, taskInstanceId openapi_types.UUID, body *map[string]*JsonValue) (resp *resty.Response, err error) { + var res interface{} - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) + if err != nil { + return nil, err + } + req.SetPathParam("task_instance_id", pathParam) } if body == nil { return nil, fmt.Errorf("PutRenderedFields requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("task_instance_id", pathParam0). + resp, err = req. + SetContentType("application/json"). SetBody(body). - SetResult(&res). Put("task-instances/{task_instance_id}/rtif") if err != nil { return resp, err @@ -2096,23 +2284,25 @@ func (c *taskInstancesClient) PutRenderedFields(ctx context.Context, taskInstanc // RunResponse performs the HTTP request and returns the lower level [resty.Response] func (c *taskInstancesClient) RunResponse(ctx context.Context, taskInstanceId openapi_types.UUID, body *TIEnterRunningPayload) (resp *resty.Response, err error) { + var res TIRunContext - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) + if err != nil { + return nil, err + } + req.SetPathParam("task_instance_id", pathParam) } if body == nil { return nil, fmt.Errorf("Run requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("task_instance_id", pathParam0). + resp, err = req. + SetContentType("application/json"). SetBody(body). - SetResult(&res). Patch("task-instances/{task_instance_id}/run") if err != nil { return resp, err @@ -2132,19 +2322,21 @@ func (c *taskInstancesClient) Run(ctx context.Context, taskInstanceId openapi_ty // SkipDownstreamResponse performs the HTTP request and returns the lower level [resty.Response] func (c *taskInstancesClient) SkipDownstreamResponse(ctx context.Context, taskInstanceId openapi_types.UUID, body *TISkippedDownstreamTasksStatePayload) (resp *resty.Response, err error) { - var pathParam0 string + req := c.R().SetContext(ctx) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) + if err != nil { + return nil, err + } + req.SetPathParam("task_instance_id", pathParam) } if body == nil { return nil, fmt.Errorf("SkipDownstream requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("task_instance_id", pathParam0). + resp, err = req. + SetContentType("application/json"). SetBody(body). Patch("task-instances/{task_instance_id}/skip-downstream") if err != nil { @@ -2161,19 +2353,21 @@ func (c *taskInstancesClient) SkipDownstream(ctx context.Context, taskInstanceId // UpdateStateResponse performs the HTTP request and returns the lower level [resty.Response] func (c *taskInstancesClient) UpdateStateResponse(ctx context.Context, taskInstanceId openapi_types.UUID, body *TIUpdateStatePayload) (resp *resty.Response, err error) { - var pathParam0 string + req := c.R().SetContext(ctx) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) + if err != nil { + return nil, err + } + req.SetPathParam("task_instance_id", pathParam) } if body == nil { return nil, fmt.Errorf("UpdateState requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("task_instance_id", pathParam0). + resp, err = req. + SetContentType("application/json"). SetBody(body). Patch("task-instances/{task_instance_id}/state") if err != nil { @@ -2189,19 +2383,20 @@ func (c *taskInstancesClient) UpdateState(ctx context.Context, taskInstanceId op // ValidateInletsAndOutletsResponse performs the HTTP request and returns the lower level [resty.Response] func (c *taskInstancesClient) ValidateInletsAndOutletsResponse(ctx context.Context, taskInstanceId openapi_types.UUID) (resp *resty.Response, err error) { + var res InactiveAssetsResponse - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) + if err != nil { + return nil, err + } + req.SetPathParam("task_instance_id", pathParam) } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("task_instance_id", pathParam0). - SetResult(&res). + resp, err = req. Get("task-instances/{task_instance_id}/validate-inlets-and-outlets") if err != nil { return resp, err @@ -2224,19 +2419,20 @@ type taskReschedulesClient struct { // GetStartDateResponse performs the HTTP request and returns the lower level [resty.Response] func (c *taskReschedulesClient) GetStartDateResponse(ctx context.Context, taskInstanceId openapi_types.UUID) (resp *resty.Response, err error) { + var res time.Time - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_instance_id", runtime.ParamLocationPath, taskInstanceId) + if err != nil { + return nil, err + } + req.SetPathParam("task_instance_id", pathParam) } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("task_instance_id", pathParam0). - SetResult(&res). + resp, err = req. Get("task-reschedules/{task_instance_id}/start_date") if err != nil { return resp, err @@ -2259,19 +2455,20 @@ type variablesClient struct { // DeleteResponse performs the HTTP request and returns the lower level [resty.Response] func (c *variablesClient) DeleteResponse(ctx context.Context, variableKey string) (resp *resty.Response, err error) { + var res HTTPValidationError - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "variable_key", runtime.ParamLocationPath, variableKey) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "variable_key", runtime.ParamLocationPath, variableKey) + if err != nil { + return nil, err + } + req.SetPathParam("variable_key", pathParam) } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("variable_key", pathParam0). - SetResult(&res). + resp, err = req. Delete("variables/{variable_key}") if err != nil { return resp, err @@ -2290,19 +2487,20 @@ func (c *variablesClient) Delete(ctx context.Context, variableKey string) (*HTTP // GetResponse performs the HTTP request and returns the lower level [resty.Response] func (c *variablesClient) GetResponse(ctx context.Context, variableKey string) (resp *resty.Response, err error) { + var res VariableResponse - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "variable_key", runtime.ParamLocationPath, variableKey) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "variable_key", runtime.ParamLocationPath, variableKey) + if err != nil { + return nil, err + } + req.SetPathParam("variable_key", pathParam) } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("variable_key", pathParam0). - SetResult(&res). + resp, err = req. Get("variables/{variable_key}") if err != nil { return resp, err @@ -2321,23 +2519,25 @@ func (c *variablesClient) Get(ctx context.Context, variableKey string) (*Variabl // PutResponse performs the HTTP request and returns the lower level [resty.Response] func (c *variablesClient) PutResponse(ctx context.Context, variableKey string, body *VariablePostBody) (resp *resty.Response, err error) { + var res interface{} - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "variable_key", runtime.ParamLocationPath, variableKey) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "variable_key", runtime.ParamLocationPath, variableKey) + if err != nil { + return nil, err + } + req.SetPathParam("variable_key", pathParam) } if body == nil { return nil, fmt.Errorf("Put requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("variable_key", pathParam0). + resp, err = req. + SetContentType("application/json"). SetBody(body). - SetResult(&res). Put("variables/{variable_key}") if err != nil { return resp, err @@ -2360,48 +2560,58 @@ type xcomsClient struct { // DeleteResponse performs the HTTP request and returns the lower level [resty.Response] func (c *xcomsClient) DeleteResponse(ctx context.Context, dagId string, runId string, taskId string, key string, params *DeleteXcomParams) (resp *resty.Response, err error) { + var res interface{} - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } - var pathParam2 string - pathParam2, err = runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) + if err != nil { + return nil, err + } + req.SetPathParam("task_id", pathParam) } - var pathParam3 string - pathParam3, err = runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) + if err != nil { + return nil, err + } + req.SetPathParam("key", pathParam) } - var param0 string + if params != nil && params.MapIndex != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) - if err != nil { - return nil, err } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("run_id", pathParam1). - SetPathParam("task_id", pathParam2). - SetPathParam("key", pathParam3). - SetQueryParam("map_index", param0). - SetResult(&res). + resp, err = req. Delete("xcoms/{dag_id}/{run_id}/{task_id}/{key}") if err != nil { return resp, err @@ -2420,62 +2630,86 @@ func (c *xcomsClient) Delete(ctx context.Context, dagId string, runId string, ta // GetResponse performs the HTTP request and returns the lower level [resty.Response] func (c *xcomsClient) GetResponse(ctx context.Context, dagId string, runId string, taskId string, key string, params *GetXcomParams) (resp *resty.Response, err error) { + var res XComResponse - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } - var pathParam2 string - pathParam2, err = runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) + if err != nil { + return nil, err + } + req.SetPathParam("task_id", pathParam) } - var pathParam3 string - pathParam3, err = runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) + if err != nil { + return nil, err + } + req.SetPathParam("key", pathParam) } - var param0 string + if params != nil && params.MapIndex != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) - if err != nil { - return nil, err } - var param1 string - param1, err = runtime.StyleParamWithLocation("form", true, "include_prior_dates", runtime.ParamLocationQuery, params.IncludePriorDates) - if err != nil { - return nil, err + if params != nil && params.IncludePriorDates != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "include_prior_dates", runtime.ParamLocationQuery, params.IncludePriorDates) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param2 string - param2, err = runtime.StyleParamWithLocation("form", true, "offset", runtime.ParamLocationQuery, params.Offset) - if err != nil { - return nil, err + if params != nil && params.Offset != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "offset", runtime.ParamLocationQuery, params.Offset) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("run_id", pathParam1). - SetPathParam("task_id", pathParam2). - SetPathParam("key", pathParam3). - SetQueryParam("map_index", param0). - SetQueryParam("include_prior_dates", param1). - SetQueryParam("offset", param2). - SetResult(&res). + resp, err = req. Get("xcoms/{dag_id}/{run_id}/{task_id}/{key}") if err != nil { return resp, err @@ -2494,48 +2728,58 @@ func (c *xcomsClient) Get(ctx context.Context, dagId string, runId string, taskI // HeadResponse performs the HTTP request and returns the lower level [resty.Response] func (c *xcomsClient) HeadResponse(ctx context.Context, dagId string, runId string, taskId string, key string, params *HeadXcomParams) (resp *resty.Response, err error) { + var res interface{} - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } - var pathParam2 string - pathParam2, err = runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) + if err != nil { + return nil, err + } + req.SetPathParam("task_id", pathParam) } - var pathParam3 string - pathParam3, err = runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) + if err != nil { + return nil, err + } + req.SetPathParam("key", pathParam) } - var param0 string + if params != nil && params.MapIndex != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) - if err != nil { - return nil, err } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("run_id", pathParam1). - SetPathParam("task_id", pathParam2). - SetPathParam("key", pathParam3). - SetQueryParam("map_index", param0). - SetResult(&res). + resp, err = req. Head("xcoms/{dag_id}/{run_id}/{task_id}/{key}") if err != nil { return resp, err @@ -2554,59 +2798,77 @@ func (c *xcomsClient) Head(ctx context.Context, dagId string, runId string, task // SetResponse performs the HTTP request and returns the lower level [resty.Response] func (c *xcomsClient) SetResponse(ctx context.Context, dagId string, runId string, taskId string, key string, params *SetXcomParams, body *interface{}) (resp *resty.Response, err error) { + var res interface{} - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } - var pathParam2 string - pathParam2, err = runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) + if err != nil { + return nil, err + } + req.SetPathParam("task_id", pathParam) } - var pathParam3 string - pathParam3, err = runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) + if err != nil { + return nil, err + } + req.SetPathParam("key", pathParam) } - var param0 string + if params != nil && params.MapIndex != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "map_index", runtime.ParamLocationQuery, params.MapIndex) - if err != nil { - return nil, err } - var param1 string - param1, err = runtime.StyleParamWithLocation("form", true, "mapped_length", runtime.ParamLocationQuery, params.MappedLength) - if err != nil { - return nil, err + if params != nil && params.MappedLength != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "mapped_length", runtime.ParamLocationQuery, params.MappedLength) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } if body == nil { return nil, fmt.Errorf("Set requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("run_id", pathParam1). - SetPathParam("task_id", pathParam2). - SetPathParam("key", pathParam3). - SetQueryParam("map_index", param0). - SetQueryParam("mapped_length", param1). + resp, err = req. + SetContentType("application/json"). SetBody(body). - SetResult(&res). Post("xcoms/{dag_id}/{run_id}/{task_id}/{key}") if err != nil { return resp, err @@ -2625,47 +2887,52 @@ func (c *xcomsClient) Set(ctx context.Context, dagId string, runId string, taskI // GetMappedByIndexResponse performs the HTTP request and returns the lower level [resty.Response] func (c *xcomsClient) GetMappedByIndexResponse(ctx context.Context, dagId string, runId string, taskId string, key string, offset int) (resp *resty.Response, err error) { + var res XComSequenceIndexResponse - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } - var pathParam2 string - pathParam2, err = runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) + if err != nil { + return nil, err + } + req.SetPathParam("task_id", pathParam) } - var pathParam3 string - pathParam3, err = runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) + if err != nil { + return nil, err + } + req.SetPathParam("key", pathParam) } - var pathParam4 string - pathParam4, err = runtime.StyleParamWithLocation("simple", false, "offset", runtime.ParamLocationPath, offset) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "offset", runtime.ParamLocationPath, offset) + if err != nil { + return nil, err + } + req.SetPathParam("offset", pathParam) } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("run_id", pathParam1). - SetPathParam("task_id", pathParam2). - SetPathParam("key", pathParam3). - SetPathParam("offset", pathParam4). - SetResult(&res). + resp, err = req. Get("xcoms/{dag_id}/{run_id}/{task_id}/{key}/item/{offset}") if err != nil { return resp, err @@ -2684,62 +2951,86 @@ func (c *xcomsClient) GetMappedByIndex(ctx context.Context, dagId string, runId // GetMappedBySliceResponse performs the HTTP request and returns the lower level [resty.Response] func (c *xcomsClient) GetMappedBySliceResponse(ctx context.Context, dagId string, runId string, taskId string, key string, params *GetMappedXcomBySliceParams) (resp *resty.Response, err error) { + var res XComSequenceSliceResponse - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } - var pathParam2 string - pathParam2, err = runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) + if err != nil { + return nil, err + } + req.SetPathParam("task_id", pathParam) } - var pathParam3 string - pathParam3, err = runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "key", runtime.ParamLocationPath, key) + if err != nil { + return nil, err + } + req.SetPathParam("key", pathParam) } - var param0 string + if params != nil && params.Start != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "start", runtime.ParamLocationQuery, params.Start) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) - param0, err = runtime.StyleParamWithLocation("form", true, "start", runtime.ParamLocationQuery, params.Start) - if err != nil { - return nil, err } - var param1 string - param1, err = runtime.StyleParamWithLocation("form", true, "stop", runtime.ParamLocationQuery, params.Stop) - if err != nil { - return nil, err + if params != nil && params.Stop != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "stop", runtime.ParamLocationQuery, params.Stop) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - var param2 string - param2, err = runtime.StyleParamWithLocation("form", true, "step", runtime.ParamLocationQuery, params.Step) - if err != nil { - return nil, err + if params != nil && params.Step != nil { + + exploded, err := runtime.StyleParamWithLocation("form", true, "step", runtime.ParamLocationQuery, params.Step) + if err != nil { + return nil, err + } + vals, err := url.ParseQuery(exploded) + if err != nil { + return nil, err + } + req.SetQueryParamsFromValues(vals) + } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("run_id", pathParam1). - SetPathParam("task_id", pathParam2). - SetPathParam("key", pathParam3). - SetQueryParam("start", param0). - SetQueryParam("stop", param1). - SetQueryParam("step", param2). - SetResult(&res). + resp, err = req. Get("xcoms/{dag_id}/{run_id}/{task_id}/{key}/slice") if err != nil { return resp, err diff --git a/go-sdk/pkg/api/models.go b/go-sdk/pkg/api/models.go index 1dba6b07d41..a91eecd09b7 100644 --- a/go-sdk/pkg/api/models.go +++ b/go-sdk/pkg/api/models.go @@ -23,3 +23,5 @@ type ExecuteTaskWorkload struct { BundleInfo BundleInfo `json:"bundle_info"` LogPath *string `json:"log_path,omitempty"` } + +const XComReturnValueKey = "return_value" diff --git a/go-sdk/pkg/edgeapi/client.gen.go b/go-sdk/pkg/edgeapi/client.gen.go index ab1f4896327..342bb36a58a 100644 --- a/go-sdk/pkg/edgeapi/client.gen.go +++ b/go-sdk/pkg/edgeapi/client.gen.go @@ -1,6 +1,6 @@ // Package edgeapi provides primitives to interact with the openapi HTTP API. // -// Code generated by github.com/ashb/oapi-resty-codegen version v0.0.0-20250926211356-86b7321337dd DO NOT EDIT. +// Code generated by github.com/ashb/oapi-resty-codegen version v0.0.0-20250930162853-bec2fc27e468 DO NOT EDIT. // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -669,11 +669,9 @@ func WithRequestMiddleware(mw resty.RequestMiddleware) ClientOption { func (c *Client) Jobs() JobsClient { return &jobsClient{c.Client} } - func (c *Client) Logs() LogsClient { return &logsClient{c.Client} } - func (c *Client) Worker() WorkerClient { return &workerClient{c.Client} } @@ -741,6 +739,7 @@ type jobsClient struct { // FetchResponse performs the HTTP request and returns the lower level [resty.Response] func (c *jobsClient) FetchResponse(ctx context.Context, workerName string, body *WorkerQueuesBody) (resp *resty.Response, err error) { + var res struct { // Command Execute the given Task. Command ExecuteTask `json:"command"` @@ -764,21 +763,22 @@ func (c *jobsClient) FetchResponse(ctx context.Context, workerName string, body TryNumber int `json:"try_number"` } - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "worker_name", runtime.ParamLocationPath, workerName) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "worker_name", runtime.ParamLocationPath, workerName) + if err != nil { + return nil, err + } + req.SetPathParam("worker_name", pathParam) } if body == nil { return nil, fmt.Errorf("Fetch requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("worker_name", pathParam0). + resp, err = req. + SetContentType("application/json"). SetBody(body). - SetResult(&res). Post("edge_worker/v1/jobs/fetch/{worker_name}") if err != nil { return resp, err @@ -807,8 +807,7 @@ func (c *jobsClient) Fetch(ctx context.Context, workerName string, body *WorkerQ // TryNumber The number of attempt to execute this task. TryNumber int `json:"try_number"` -}, error, -) { +}, error) { res, err := c.FetchResponse(ctx, workerName, body) if err != nil { return nil, err @@ -840,54 +839,60 @@ func (c *jobsClient) Fetch(ctx context.Context, workerName string, body *WorkerQ // StateResponse performs the HTTP request and returns the lower level [resty.Response] func (c *jobsClient) StateResponse(ctx context.Context, dagId string, taskId string, runId string, tryNumber int, mapIndex int, state TaskInstanceState) (resp *resty.Response, err error) { + var res interface{} - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) + if err != nil { + return nil, err + } + req.SetPathParam("task_id", pathParam) } - var pathParam2 string - pathParam2, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } - var pathParam3 string - pathParam3, err = runtime.StyleParamWithLocation("simple", false, "try_number", runtime.ParamLocationPath, tryNumber) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "try_number", runtime.ParamLocationPath, tryNumber) + if err != nil { + return nil, err + } + req.SetPathParam("try_number", pathParam) } - var pathParam4 string - pathParam4, err = runtime.StyleParamWithLocation("simple", false, "map_index", runtime.ParamLocationPath, mapIndex) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "map_index", runtime.ParamLocationPath, mapIndex) + if err != nil { + return nil, err + } + req.SetPathParam("map_index", pathParam) } - var pathParam5 string - pathParam5, err = runtime.StyleParamWithLocation("simple", false, "state", runtime.ParamLocationPath, state) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "state", runtime.ParamLocationPath, state) + if err != nil { + return nil, err + } + req.SetPathParam("state", pathParam) } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("task_id", pathParam1). - SetPathParam("run_id", pathParam2). - SetPathParam("try_number", pathParam3). - SetPathParam("map_index", pathParam4). - SetPathParam("state", pathParam5). - SetResult(&res). + resp, err = req. Patch("edge_worker/v1/jobs/state/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}/{state}") if err != nil { return resp, err @@ -910,47 +915,52 @@ type logsClient struct { // filePathResponse performs the HTTP request and returns the lower level [resty.Response] func (c *logsClient) filePathResponse(ctx context.Context, dagId string, taskId string, runId string, tryNumber int, mapIndex int) (resp *resty.Response, err error) { + var res string - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) + if err != nil { + return nil, err + } + req.SetPathParam("task_id", pathParam) } - var pathParam2 string - pathParam2, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } - var pathParam3 string - pathParam3, err = runtime.StyleParamWithLocation("simple", false, "try_number", runtime.ParamLocationPath, tryNumber) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "try_number", runtime.ParamLocationPath, tryNumber) + if err != nil { + return nil, err + } + req.SetPathParam("try_number", pathParam) } - var pathParam4 string - pathParam4, err = runtime.StyleParamWithLocation("simple", false, "map_index", runtime.ParamLocationPath, mapIndex) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "map_index", runtime.ParamLocationPath, mapIndex) + if err != nil { + return nil, err + } + req.SetPathParam("map_index", pathParam) } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("task_id", pathParam1). - SetPathParam("run_id", pathParam2). - SetPathParam("try_number", pathParam3). - SetPathParam("map_index", pathParam4). - SetResult(&res). + resp, err = req. Get("edge_worker/v1/logs/logfile_path/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}") if err != nil { return resp, err @@ -969,51 +979,57 @@ func (c *logsClient) filePath(ctx context.Context, dagId string, taskId string, // PushResponse performs the HTTP request and returns the lower level [resty.Response] func (c *logsClient) PushResponse(ctx context.Context, dagId string, taskId string, runId string, tryNumber int, mapIndex int, body *PushLogsBody) (resp *resty.Response, err error) { + var res interface{} - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "dag_id", runtime.ParamLocationPath, dagId) + if err != nil { + return nil, err + } + req.SetPathParam("dag_id", pathParam) } - var pathParam1 string - pathParam1, err = runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "task_id", runtime.ParamLocationPath, taskId) + if err != nil { + return nil, err + } + req.SetPathParam("task_id", pathParam) } - var pathParam2 string - pathParam2, err = runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "run_id", runtime.ParamLocationPath, runId) + if err != nil { + return nil, err + } + req.SetPathParam("run_id", pathParam) } - var pathParam3 string - pathParam3, err = runtime.StyleParamWithLocation("simple", false, "try_number", runtime.ParamLocationPath, tryNumber) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "try_number", runtime.ParamLocationPath, tryNumber) + if err != nil { + return nil, err + } + req.SetPathParam("try_number", pathParam) } - var pathParam4 string - pathParam4, err = runtime.StyleParamWithLocation("simple", false, "map_index", runtime.ParamLocationPath, mapIndex) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "map_index", runtime.ParamLocationPath, mapIndex) + if err != nil { + return nil, err + } + req.SetPathParam("map_index", pathParam) } if body == nil { return nil, fmt.Errorf("Push requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("dag_id", pathParam0). - SetPathParam("task_id", pathParam1). - SetPathParam("run_id", pathParam2). - SetPathParam("try_number", pathParam3). - SetPathParam("map_index", pathParam4). + resp, err = req. + SetContentType("application/json"). SetBody(body). - SetResult(&res). Post("edge_worker/v1/logs/push/{dag_id}/{task_id}/{run_id}/{try_number}/{map_index}") if err != nil { return resp, err @@ -1036,23 +1052,25 @@ type workerClient struct { // UpdateQueuesResponse performs the HTTP request and returns the lower level [resty.Response] func (c *workerClient) UpdateQueuesResponse(ctx context.Context, workerName string, body *WorkerQueueUpdateBody) (resp *resty.Response, err error) { + var res interface{} - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "worker_name", runtime.ParamLocationPath, workerName) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "worker_name", runtime.ParamLocationPath, workerName) + if err != nil { + return nil, err + } + req.SetPathParam("worker_name", pathParam) } if body == nil { return nil, fmt.Errorf("UpdateQueues requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("worker_name", pathParam0). + resp, err = req. + SetContentType("application/json"). SetBody(body). - SetResult(&res). Patch("edge_worker/v1/worker/queues/{worker_name}") if err != nil { return resp, err @@ -1071,23 +1089,25 @@ func (c *workerClient) UpdateQueues(ctx context.Context, workerName string, body // SetStateResponse performs the HTTP request and returns the lower level [resty.Response] func (c *workerClient) SetStateResponse(ctx context.Context, workerName string, body *WorkerStateBody) (resp *resty.Response, err error) { + var res WorkerSetStateReturn - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "worker_name", runtime.ParamLocationPath, workerName) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "worker_name", runtime.ParamLocationPath, workerName) + if err != nil { + return nil, err + } + req.SetPathParam("worker_name", pathParam) } if body == nil { return nil, fmt.Errorf("SetState requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("worker_name", pathParam0). + resp, err = req. + SetContentType("application/json"). SetBody(body). - SetResult(&res). Patch("edge_worker/v1/worker/{worker_name}") if err != nil { return resp, err @@ -1106,23 +1126,25 @@ func (c *workerClient) SetState(ctx context.Context, workerName string, body *Wo // RegisterResponse performs the HTTP request and returns the lower level [resty.Response] func (c *workerClient) RegisterResponse(ctx context.Context, workerName string, body *WorkerStateBody) (resp *resty.Response, err error) { + var res WorkerRegistrationReturn - var pathParam0 string + req := c.R().SetContext(ctx).SetResult(&res) - pathParam0, err = runtime.StyleParamWithLocation("simple", false, "worker_name", runtime.ParamLocationPath, workerName) - if err != nil { - return nil, err + { + pathParam, err := runtime.StyleParamWithLocation("simple", false, "worker_name", runtime.ParamLocationPath, workerName) + if err != nil { + return nil, err + } + req.SetPathParam("worker_name", pathParam) } if body == nil { return nil, fmt.Errorf("Register requires a non-nil body argument") } - resp, err = c.R(). - SetContext(ctx). - SetPathParam("worker_name", pathParam0). + resp, err = req. + SetContentType("application/json"). SetBody(body). - SetResult(&res). Post("edge_worker/v1/worker/{worker_name}") if err != nil { return resp, err diff --git a/go-sdk/pkg/worker/runner.go b/go-sdk/pkg/worker/runner.go index 29a4b659bb0..4065e30ad70 100644 --- a/go-sdk/pkg/worker/runner.go +++ b/go-sdk/pkg/worker/runner.go @@ -37,6 +37,10 @@ import ( ) type ( + // task is an interface of an task implementation. + Task interface { + Execute(ctx context.Context, logger *slog.Logger) error + } // Bundle interface defines a type that is used "at execution time" to lookup a Task to execute Bundle interface { LookupTask(dagId, taskId string) (Task, bool) diff --git a/go-sdk/pkg/worker/task.go b/go-sdk/pkg/worker/task.go deleted file mode 100644 index aeeb517187e..00000000000 --- a/go-sdk/pkg/worker/task.go +++ /dev/null @@ -1,158 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package worker - -import ( - "context" - "fmt" - "log/slog" - "reflect" - "runtime" - - "github.com/apache/airflow/go-sdk/sdk" -) - -type ( - taskFunction struct { - fn reflect.Value - fullName string - } - Task interface { - Execute(ctx context.Context, logger *slog.Logger) error - } -) - -var _ Task = (*taskFunction)(nil) - -func NewTaskFunction(fn any) (Task, error) { - v := reflect.ValueOf(fn) - fullName := runtime.FuncForPC(v.Pointer()).Name() - f := &taskFunction{v, fullName} - return f, f.validateFn(v.Type()) -} - -func (f *taskFunction) Execute(ctx context.Context, logger *slog.Logger) error { - fnType := f.fn.Type() - - reflectArgs := make([]reflect.Value, fnType.NumIn()) - for i := range reflectArgs { - in := fnType.In(i) - - switch { - case isContext(in): - reflectArgs[i] = reflect.ValueOf(ctx) - case isLogger(in): - reflectArgs[i] = reflect.ValueOf(logger) - case isClient(in): - reflectArgs[i] = reflect.ValueOf(sdk.NewClient()) - default: - // TODO: deal with other value types. For now they will all be Zero values unless it's a context - reflectArgs[i] = reflect.Zero(in) - } - } - slog.Debug("Attempting to call fn", "fn", f.fn, "args", reflectArgs) - retValues := f.fn.Call(reflectArgs) - - var err error - if errResult := retValues[len(retValues)-1].Interface(); errResult != nil { - var ok bool - if err, ok = errResult.(error); !ok { - return fmt.Errorf( - "failed to extract task error result as it is not of error interface: %v", - errResult, - ) - } - } - // If there are two results, convert the first only if it's not a nil pointer - var res any - if len(retValues) > 1 && (retValues[0].Kind() != reflect.Ptr || !retValues[0].IsNil()) { - res = retValues[0].Interface() - } - // TODO: send the result to XCom - _ = res - return err -} - -func (f *taskFunction) validateFn(fnType reflect.Type) error { - if fnType.Kind() != reflect.Func { - return fmt.Errorf("expected a func as input but was %s", fnType.Kind()) - } - - // Return values - // `<result>, error`, or just `error` - if fnType.NumOut() < 1 || fnType.NumOut() > 2 { - return fmt.Errorf( - "task function %s has %d return values, must be `<result>, error` or just `error`", - f.fullName, - fnType.NumOut(), - ) - } - if fnType.NumOut() > 1 && !isValidResultType(fnType.Out(0)) { - return fmt.Errorf( - "expected task function %s first return value to return valid type but found: %v", - f.fullName, - fnType.Out(0).Kind(), - ) - } - if !isError(fnType.Out(fnType.NumOut() - 1)) { - return fmt.Errorf( - "expected task function %s last return value to return error but found %v", - f.fullName, - fnType.Out(fnType.NumOut()-1).Kind(), - ) - } - return nil -} - -func isValidResultType(inType reflect.Type) bool { - // https://golang.org/pkg/reflect/#Kind - switch inType.Kind() { - case reflect.Func, reflect.Chan, reflect.UnsafePointer: - return false - } - - return true -} - -var ( - errorType = reflect.TypeFor[error]() - contextType = reflect.TypeFor[context.Context]() - slogLoggerType = reflect.TypeFor[*slog.Logger]() - - connClientType = reflect.TypeFor[sdk.ConnectionClient]() - varClientType = reflect.TypeFor[sdk.VariableClient]() - clientType = reflect.TypeFor[sdk.Client]() -) - -func isError(inType reflect.Type) bool { - return inType != nil && inType.Implements(errorType) -} - -func isContext(inType reflect.Type) bool { - return inType != nil && inType.Implements(contextType) -} - -func isLogger(inType reflect.Type) bool { - return inType != nil && inType.AssignableTo(slogLoggerType) -} - -func isClient(inType reflect.Type) bool { - return inType != nil && (inType.AssignableTo(clientType) || - inType.AssignableTo(connClientType) || - inType.AssignableTo(varClientType)) -} diff --git a/go-sdk/pkg/worker/task_test.go b/go-sdk/pkg/worker/task_test.go deleted file mode 100644 index 975ea329bf8..00000000000 --- a/go-sdk/pkg/worker/task_test.go +++ /dev/null @@ -1,119 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package worker - -import ( - "context" - "log/slog" - "testing" - - "github.com/stretchr/testify/suite" - - "github.com/apache/airflow/go-sdk/pkg/logging" - "github.com/apache/airflow/go-sdk/sdk" -) - -type TaskSuite struct { - suite.Suite -} - -func TestTaskSuite(t *testing.T) { - suite.Run(t, &TaskSuite{}) -} - -func (s *TaskSuite) TestReturnValidation() { - cases := map[string]struct { - fn any - errContains string - }{ - "no-ret-values": { - func() {}, - `func\d+ has 0 return values, must be`, - }, - "too-many-ret-values": { - func() (a, b, c int) { return }, - `func\d+ has 3 return values, must be`, - }, - "invalid-ret": { - func() (c chan int) { return }, - `func\d+ last return value to return error but found chan`, - }, - } - - for name, tt := range cases { - s.Run(name, func() { - _, err := NewTaskFunction(tt.fn) - if s.Assert().Error(err) { - s.Assert().Regexp(tt.errContains, err.Error()) - } - }) - } -} - -func (s *TaskSuite) TestArgumentBinding() { - cases := map[string]struct { - fn any - }{ - "no-args": { - func() error { return nil }, - }, - "context": { - func(ctx context.Context) error { - s.Equal("def", ctx.Value("abc")) - return nil - }, - }, - "context-and-logger": { - func(ctx context.Context, logger *slog.Logger) error { - s.Equal("def", ctx.Value("abc")) - s.NotNil(logger) - return nil - }, - }, - "client": { - func(client sdk.Client) error { - s.NotNil(client) - return nil - }, - }, - "var-client": { - func(client sdk.VariableClient) error { - s.NotNil(client) - return nil - }, - }, - "conn-client": { - func(client sdk.ConnectionClient) error { - s.NotNil(client) - - return nil - }, - }, - } - - for name, tt := range cases { - s.Run(name, func() { - task, err := NewTaskFunction(tt.fn) - s.Require().NoError(err) - - ctx := context.WithValue(context.Background(), "abc", "def") - logger := slog.New(logging.NewTeeLogger()) - task.Execute(ctx, logger) - }) - } -} diff --git a/go-sdk/sdk/client.go b/go-sdk/sdk/client.go index 92d4031edfe..4d0192e0baa 100644 --- a/go-sdk/sdk/client.go +++ b/go-sdk/sdk/client.go @@ -52,7 +52,6 @@ func (*client) GetVariable(ctx context.Context, key string) (string, error) { resp, err := httpClient.Variables().Get(ctx, key) if err != nil { var httpError *api.GeneralHTTPError - errors.As(err, &httpError) if errors.As(err, &httpError) && httpError.Response.StatusCode() == 404 { err = fmt.Errorf("%w: %q", VariableNotFound, key) } @@ -79,7 +78,6 @@ func (*client) GetConnection(ctx context.Context, connID string) (Connection, er resp, err := httpClient.Connections().Get(ctx, connID) if err != nil { var httpError *api.GeneralHTTPError - errors.As(err, &httpError) if errors.As(err, &httpError) && httpError.Response.StatusCode() == 404 { err = fmt.Errorf("%w: %q", ConnectionNotFound, connID) } @@ -88,3 +86,57 @@ func (*client) GetConnection(ctx context.Context, connID string) (Connection, er return connFromAPIResponse(resp) } + +func (c *client) PutXComForTi( + ctx context.Context, + ti api.TaskInstance, + key string, + value any, +) error { + return c.PutXCom(ctx, ti.DagId, ti.RunId, ti.TaskId, ti.MapIndex, key, value) +} + +func (*client) PutXCom( + ctx context.Context, + dagId, runId, taskId string, + mapIndex *int, + key string, + value any, +) error { + params := api.SetXcomParams{} + + if mapIndex != nil && *mapIndex != -1 { + params.MapIndex = mapIndex + } + + httpClient := ctx.Value(sdkcontext.ApiClientContextKey).(api.ClientInterface) + _, err := httpClient.Xcoms().SetResponse(ctx, dagId, runId, taskId, key, ¶ms, &value) + if err != nil { + return err + } + return nil +} + +func (*client) GetXCom( + ctx context.Context, + dagId, runId, taskId string, + mapIndex *int, + key string, + value any, +) (any, error) { + params := api.GetXcomParams{ + MapIndex: mapIndex, + } + + httpClient := ctx.Value(sdkcontext.ApiClientContextKey).(api.ClientInterface) + res, err := httpClient.Xcoms().Get(ctx, dagId, runId, taskId, key, ¶ms) + if err != nil { + var httpError *api.GeneralHTTPError + if errors.As(err, &httpError) && httpError.Response.StatusCode() == 404 { + err = fmt.Errorf("%w: %q", XComNotFound, key) + } + return nil, err + } + // TODO: We probably want to do some level of xcom deser here + return res.Value, nil +} diff --git a/go-sdk/sdk/errors.go b/go-sdk/sdk/errors.go index a3cd37c9a95..d8b071f0944 100644 --- a/go-sdk/sdk/errors.go +++ b/go-sdk/sdk/errors.go @@ -32,3 +32,5 @@ var VariableNotFound = errors.New("variable not found") // // See the “GetConnection“ method of [ConnectionClient] for an example var ConnectionNotFound = errors.New("connection not found") + +var XComNotFound = errors.New("xcom not found") diff --git a/go-sdk/sdk/sdk.go b/go-sdk/sdk/sdk.go index 31f9000b9ce..aeca0d7935d 100644 --- a/go-sdk/sdk/sdk.go +++ b/go-sdk/sdk/sdk.go @@ -22,6 +22,8 @@ package sdk import ( "context" + + "github.com/apache/airflow/go-sdk/pkg/api" ) const ( @@ -61,7 +63,26 @@ type ConnectionClient interface { GetConnection(ctx context.Context, connID string) (Connection, error) } +type XComClient interface { + GetXCom( + ctx context.Context, + dagId, runId, taskId string, + mapIndex *int, + key string, + value any, + ) (any, error) + PutXComForTi(ctx context.Context, ti api.TaskInstance, key string, value any) error + PutXCom( + ctx context.Context, + dagId, runId, taskId string, + mapIndex *int, + key string, + value any, + ) error +} + type Client interface { VariableClient ConnectionClient + XComClient }
