ashb opened a new pull request, #43893:
URL: https://github.com/apache/airflow/pull/43893

   The eventual goal if this "airflow.sdk.exeuction_time" package is to replace
   LocalTaskJob and StandardTaskRunner, but at this stage it co-exists with it's
   replacement.
   
   As this PR is not a complete re-implementation of all the features that exist
   currently (no handling of task level callbacks yet, no AirflowSkipException
   etc.) the current tests are skeleton at best. Once we get closer to feature
   parity (in future PRs) the tests will grow to match.
   
   This supervisor and task runner operates slightly differently to the current
   classes in these ways
   
   **Logs from the subprocess are send over a different channel to 
stdout/stderr**
   
   This makes the task supervisor a little bit more complex as it now has to
   read stdout, stderr and a logs channel. The advantage of this approach is
   that it makes the logs setup in the task process itself markedly simpler --
   all it has to do is write logs output to the custom file handle as JSON and
   it will show up "natively" as logs.
   
   structlog has been chosen as the logging engine over stdlib's own logging as
   the ability to have structured fields in the logs is nice, and stdlib is
   configured to send logs to a stuctlog processor.
   
   **Direct database access is replaced with an HTTP API client**
   
   This is the crux of this feature and of AIP-72 in general -- tasks run via
   this runner can no longer access DB models or DB session directly. This PR
   doesn't yet implement the code/shims to make 
`Connection.get_connection_from_secrets`
   use this client yet - that will be future work.
   
   The reason tasks don't speak directly to the API server is primarily for two
   reasons:
   
   1. The supervisor process already needs to maintain an http session in order
      to report the task as started, to heart beat it, and to mark it as
      finished; and so because of that
   2. Reduce the number of active HTTP connections for tasks to 1 per task
      (instead of 2 per task).
   
   THe other reason we have this interface is that DAG parsing code will very
   soon need to be updated to not have direct DB access either, and having this
   "in process" interface ability already means that we can support commands 
like
   `airflow dags reserialize` without having a running API server.
   
   The API client itself is not auto-generated: I tried a number of different
   client generates based on the OpenAPI spec and found them all lacking or 
buggy
   in different ways, and the http client side itself is very simple, the only
   interesting/difficult bit is the generation of the datamodels from the 
OpenAPI
   spec which I found one that
   
   msgspec was chosen over Pydantic as it is much lighter weight (and thus
   quicker), especially on a client side when we have next to no validation
   requirements of response data. I admit that I have not benchmarked it
   specifically though.
   
   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to