jason810496 commented on code in PR #68311:
URL: https://github.com/apache/airflow/pull/68311#discussion_r3388067811
##########
go-sdk/example/bundle/main.go:
##########
@@ -60,7 +60,48 @@ func main() {
}
}
-func extract(ctx context.Context, client sdk.Client, log *slog.Logger) (any,
error) {
+// ExtractResult is extract's return value. Returning it from the task pushes
it
+// as the task's return_value XCom, ready for a downstream task to pull.
+type ExtractResult struct {
+ GoVersion string `json:"go_version"`
+ Timestamp int64 `json:"timestamp"`
+}
+
+// ExtractInput declares extract's XCom inputs. The `xcom:"python_task_1"` tag
+// binds this field to the return_value of the upstream Python task
+// `python_task_1`, so the Go task receives a Python task's output without
+// calling client.GetXCom itself. The runtime pulls and decodes it before
+// extract runs.
+type ExtractInput struct {
+ FromPython string `xcom:"python_task_1"`
+}
+
+// TransformInput binds transform's only XCom input to extract's return_value.
+// Because the field is a dedicated struct, decoding is strict: a renamed or
+// unexpected key fails the task instead of silently leaving fields zero. To
+// decode loosely (e.g. for an evolving or cross-language producer), type the
+// field map[string]any instead.
+type TransformInput struct {
+ Extracted ExtractResult `xcom:"extract"`
Review Comment:
I just duplicated our discussion here for further reviewer reference.
> This needs some more thinking about what we actually want out of a
"TaskFlow for Go".
> Cos if I have this in my python dag:
>
> ```python
> @task.stub()
> def transform(country: str, input: Any)
>
> with DAG():
> transform("uk", extract())
> ```
>
> What should happen?
For current stage with the above example, the both Java or Go side still
need to define corresponding argument with upstream `task_id` reference.
```
type TransformInput struct {
country string `xcom:transform_upstream_1`
input any `xcom:transform_upstream_2`
}
func transform(
ctx context.Context,
transformed TransformInput
) {
...
}
```
Since we will respect the Dag structure from Python side.
But, yes, I'm researching on if there's other possibility of user interface
for TaskFlow.
Java-SDK TaskFlow is able to define the upstream task_id reference inline
like:
```java
public long transformValue(Client client, @Builder.XCom(task = "extract")
long extracted) {
}
```
This is why I choose `xcom:<task_id>` tag in the input struct in the first
place.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]