Hello everyone,

I am excited to announce that the GoSDK is finally ready enough for people 
other than me to use and try out.

I said in the Keynote “as soon as I get back to my laptop", and well, that 
turned into a week off and then noticing the docs needed updating, so here we 
are.

There are a few things to note about this beta release:

- As with all beta “releases” of Apache Airflow, It is not a formal release 
according to ASF rules, it is a preview for members of the development 
community to try out and give feedback to. 

- This is a beta for a reason. While it works On My Machine (TM), it has not be 
extensively tested in various error conditions and it will undoubtedly have 
rough edges

- DAGs must still be defined in python files. To make this work the new 
`@task.stub` decorator has been added to the standard provider (already 
released before Airflow summit)

- Use of the TaskFlow API does not yet support passing XComs or static values 
directly to stub tasks. (It is not complex to fix this, but unfortunately it 
needs both some provider and API server side changes so I’m not sure when this 
will be available.)

- This only works with the Edge Executor right now.

- There is some reference API from the godoc available at 
https://pkg.go.dev/github.com/apache/airflow/go-sdk (but that is not yet 
showing the tagged version, I might have missed a step somewhere)

Architecturally note on the high-level architecture of the GoSDK: There are two 
components involved on the worker side, the first is the 
“airflow-go-edge-worker” which contains no user code, and speaks to the Edge 
Worker API to get tasks to execute, and then launches hands off to the second 
component, the dag bundle, which is a pre-compiled go binary “plugin” (using 
the hashicorp/go-plugin framework right now) which runs the actual tasks.

Feedback of any kind is always super valuable — either of things that don’t 
work, are hard to understand or just improvements you’d like. For now, please 
either reply to this email, or ping me on Slack. Soon we will add a new issue 
type to the repo.

An example DAG (taken from the readme[1]):

```python
from airflow.sdk import dag, task


@task.stub(queue="golang")
def extract(): ...


@task.stub(queue="golang")
def transform(): ...


@dag()
def simple_dag():

extract() >> transform()


multi_language()
```

And here are the task functions, and some of the registration code, from the 
example bundle in the repo[2]:

```go
func (m *myBundle) RegisterDags(dagbag v1.Registry) error {
        tutorial_dag := dagbag.AddDag("tutorial_dag")
        tutorial_dag.AddTask(extract)
        tutorial_dag.AddTask(transform)
        tutorial_dag.AddTask(load)

        return nil
}

func main() {
        bundlev1server.Serve(&myBundle{})
}

func extract(ctx context.Context, client sdk.Client, log *slog.Logger) (any, 
error) {
        for range 10 {

                // Once per loop,.check if we've been asked to cancel!
                select {
                case <-ctx.Done():
                        return nil, ctx.Err()
                default:
                }
                log.Info("After the beep the time will be", "time", time.Now())
                time.Sleep(2 * time.Second)
        }
        log.Info("Goodbye from task")

        ret := map[string]any{
                "go_version": runtime.Version(),
        }

        return ret, nil
}

func transform(ctx context.Context, client sdk.VariableClient, log 
*slog.Logger) error {
        key := "my_variable"
        val, err := client.GetVariable(ctx, key)
        if err != nil {
                return err
        }
        log.Info("Obtained variable", key, val)
        return nil
}
```

Cheers,
Ash

[1]: https://github.com/apache/airflow/tree/go-sdk/v1.0.0-beta1/go-sdk
[2]: 
https://github.com/apache/airflow/blob/go-sdk/v1.0.0-beta1/go-sdk/example/bundle/main.go

Reply via email to