jason810496 commented on code in PR #68311:
URL: https://github.com/apache/airflow/pull/68311#discussion_r3385142130


##########
go-sdk/example/bundle/main.go:
##########
@@ -60,7 +60,48 @@ func main() {
        }
 }
 
-func extract(ctx context.Context, client sdk.Client, log *slog.Logger) (any, 
error) {
+// ExtractResult is extract's return value. Returning it from the task pushes 
it
+// as the task's return_value XCom, ready for a downstream task to pull.
+type ExtractResult struct {
+       GoVersion string `json:"go_version"`
+       Timestamp int64  `json:"timestamp"`
+}
+
+// ExtractInput declares extract's XCom inputs. The `xcom:"python_task_1"` tag
+// binds this field to the return_value of the upstream Python task
+// `python_task_1`, so the Go task receives a Python task's output without
+// calling client.GetXCom itself. The runtime pulls and decodes it before
+// extract runs.
+type ExtractInput struct {
+       FromPython string `xcom:"python_task_1"`
+}
+
+// TransformInput binds transform's only XCom input to extract's return_value.
+// Because the field is a dedicated struct, decoding is strict: a renamed or
+// unexpected key fails the task instead of silently leaving fields zero. To
+// decode loosely (e.g. for an evolving or cross-language producer), type the
+// field map[string]any instead.
+type TransformInput struct {
+       Extracted ExtractResult `xcom:"extract"`
+}
+
+// TransformResult is transform's return value.
+type TransformResult struct {
+       Variable  string        `json:"variable"`
+       Extracted ExtractResult `json:"extracted"`
+}

Review Comment:
   For the return type, user should define the all the fields with `json` tag.



##########
go-sdk/example/bundle/main.go:
##########
@@ -60,7 +60,48 @@ func main() {
        }
 }
 
-func extract(ctx context.Context, client sdk.Client, log *slog.Logger) (any, 
error) {
+// ExtractResult is extract's return value. Returning it from the task pushes 
it
+// as the task's return_value XCom, ready for a downstream task to pull.
+type ExtractResult struct {
+       GoVersion string `json:"go_version"`
+       Timestamp int64  `json:"timestamp"`
+}
+
+// ExtractInput declares extract's XCom inputs. The `xcom:"python_task_1"` tag
+// binds this field to the return_value of the upstream Python task
+// `python_task_1`, so the Go task receives a Python task's output without
+// calling client.GetXCom itself. The runtime pulls and decodes it before
+// extract runs.
+type ExtractInput struct {
+       FromPython string `xcom:"python_task_1"`
+}

Review Comment:
   No matter how many upstream tasks there are, we will only inject all the 
upstream XCom result as one struct.
   
   User should define all the upstream `task_id` in the `xcom:<task_id>` (or 
`xcom:<task_id>,<key>` if the they are not using default `return_value` key) 
tag.
   
   Other directions I had consider:
   - Define the injection at `AddTask` level -> This will be ambiguous for user 
 IMO as we still need to define the Dag structure at Python side at this stage. 
If we make the interface like `AddTask(<task_id>, <other task function handle> 
... <or task id>)`. User will be confuse about: Are we defining the edge in Go 
slide instead of Python side.



##########
go-sdk/pkg/execution/client.go:
##########
@@ -206,6 +206,14 @@ func (c *CoordinatorClient) GetXCom(
                return nil, fmt.Errorf("decoding xcom result: %w", err)
        }
 
+       if result.Value == nil {
+               // The supervisor returns a null value for an absent XCom 
(Airflow's
+               // xcom_pull yields None when the key is missing), so surface 
it as
+               // not-found, matching how the HTTP client maps a 404 and how
+               // GetVariable handles the same condition above.
+               return nil, fmt.Errorf("%w: %q", sdk.XComNotFound, key)

Review Comment:
   Intentionally raise as error or it will show as decoding error even if the 
XCom not found.



##########
go-sdk/pkg/binding/binding.go:
##########
@@ -0,0 +1,354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package binding turns a task function's parameter list into the concrete
+// argument values it is called with at execution time. It is shared by both
+// execution paths (the coordinator runtime and the Edge worker), so it depends
+// only on the SDK surface and not on the bundle registry.
+//
+// Two kinds of parameter are supported:
+//
+//   - Injectable runtime values: context.Context, *slog.Logger, and
+//     sdk.Client (or the narrower sdk.VariableClient / sdk.ConnectionClient).
+//   - XCom-input structs: a struct (or pointer to one) whose exported fields
+//     each carry an `xcom:"<task_id>[,key=<key>]"` tag. Each field is pulled
+//     from the named upstream task in the current dag run and decoded into the
+//     field's type, so an author receives an upstream's return value as a 
typed
+//     parameter without calling the client explicitly.
+//
+// Analyze inspects a function once at registration and returns a Plan; Resolve
+// builds the call arguments for each execution from that Plan.
+package binding
+
+import (
+       "bytes"
+       "context"
+       "encoding/json"
+       "fmt"
+       "log/slog"
+       "reflect"
+       "strings"
+
+       "github.com/apache/airflow/go-sdk/pkg/api"
+       "github.com/apache/airflow/go-sdk/pkg/sdkcontext"
+       "github.com/apache/airflow/go-sdk/sdk"
+)
+

Review Comment:
   Extract most of the reflect and parameter binding into a new `binding` 
module to not overwhelm the `task` module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to