ashb opened a new pull request, #43893:
URL: https://github.com/apache/airflow/pull/43893
The eventual goal if this "airflow.sdk.exeuction_time" package is to replace
LocalTaskJob and StandardTaskRunner, but at this stage it co-exists with it's
replacement.
As this PR is not a complete re-implementation of all the features that exist
currently (no handling of task level callbacks yet, no AirflowSkipException
etc.) the current tests are skeleton at best. Once we get closer to feature
parity (in future PRs) the tests will grow to match.
This supervisor and task runner operates slightly differently to the current
classes in these ways
**Logs from the subprocess are send over a different channel to
stdout/stderr**
This makes the task supervisor a little bit more complex as it now has to
read stdout, stderr and a logs channel. The advantage of this approach is
that it makes the logs setup in the task process itself markedly simpler --
all it has to do is write logs output to the custom file handle as JSON and
it will show up "natively" as logs.
structlog has been chosen as the logging engine over stdlib's own logging as
the ability to have structured fields in the logs is nice, and stdlib is
configured to send logs to a stuctlog processor.
**Direct database access is replaced with an HTTP API client**
This is the crux of this feature and of AIP-72 in general -- tasks run via
this runner can no longer access DB models or DB session directly. This PR
doesn't yet implement the code/shims to make
`Connection.get_connection_from_secrets`
use this client yet - that will be future work.
The reason tasks don't speak directly to the API server is primarily for two
reasons:
1. The supervisor process already needs to maintain an http session in order
to report the task as started, to heart beat it, and to mark it as
finished; and so because of that
2. Reduce the number of active HTTP connections for tasks to 1 per task
(instead of 2 per task).
THe other reason we have this interface is that DAG parsing code will very
soon need to be updated to not have direct DB access either, and having this
"in process" interface ability already means that we can support commands
like
`airflow dags reserialize` without having a running API server.
The API client itself is not auto-generated: I tried a number of different
client generates based on the OpenAPI spec and found them all lacking or
buggy
in different ways, and the http client side itself is very simple, the only
interesting/difficult bit is the generation of the datamodels from the
OpenAPI
spec which I found one that
msgspec was chosen over Pydantic as it is much lighter weight (and thus
quicker), especially on a client side when we have next to no validation
requirements of response data. I admit that I have not benchmarked it
specifically though.
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]