Hi, *JSON RPC API* Finally I had some time to work on an Internal API based on JSON RPC. I found that this solution can be really elegant and clean from a developer perspective. You can find the OpenAPi definition there <https://github.com/apache/airflow/commit/e86c389a004b26f22953e804e365ffaee200107c#diff-e454d231c105c349be8619b3112c8bfda74f0d45b6f9503e206fc0c8d5f15351> - I expects there 3 parameters:
- jsonrpc - all examples had this, may be important for some versioning? - method_name -(string) name of the method to call - params - parameters for the method. Storing this file next to the existing Airflow APi allows it to run as part of Web Server. Of course it can still be run independently. To switch a method to make calls to the Internal API I implemented internal_api_call <https://github.com/apache/airflow/compare/main...mhenc:airflow:poc_44#diff-0f4f6450d07fffead241765439dcae32988ba1aceac81ed54c884505b813e882> decorator. Then we just add it to the method like @internal_api_call("sample_method") def sample_method(dag_id: string...): The decorator will either call the method body or send a request to the Internal APi endpoint (depending on the configuration). On the API-server side we have a rpc_api handler <https://github.com/apache/airflow/commit/eaf3f8e418d8bf00641f09837c4cf9286b1877ab#diff-024141cc0f52a9f8e84b56c9de06b0487b1bc0cd0def4749ac6490816bab3890> which contains the methods mapping: METHODS = { 'update_import_errors': DagFileProcessor.update_import_errors, 'process_file': processor.process_file } Note: `process_file`is just for testing purposes. All methods there should be static. Rpc endpoint handler then gets metod_name from the request and uses the proper function from METHODS. We may add some presubmit to make sure this array is properly populated. *Args/Results serialization* The main problem is parameters and results serialization and sending them over-the-wire. As this is JSON RPC then serialization to JSON was my first idea, however I found that it means we may need to define a custom serialization for many methods (example <https://github.com/apache/airflow/commit/e86c389a004b26f22953e804e365ffaee200107c#diff-024141cc0f52a9f8e84b56c9de06b0487b1bc0cd0def4749ac6490816bab3890R55>) - and not single but 4 of them (params_to_json, params_from_json, result_to_json, result_from_json). On the other side we could just use Pickle - serialize the args dictionary, convert to base64 (to send safely inside the JSON) then deserialize on the other side. This leads to the very elegant solution where all we need to do is change the method to static and add a decorator (compare rpc_api handler: pickle <https://github.com/apache/airflow/commit/eaf3f8e418d8bf00641f09837c4cf9286b1877ab#diff-024141cc0f52a9f8e84b56c9de06b0487b1bc0cd0def4749ac6490816bab3890> vs json <https://github.com/apache/airflow/commit/e86c389a004b26f22953e804e365ffaee200107c#diff-024141cc0f52a9f8e84b56c9de06b0487b1bc0cd0def4749ac6490816bab3890> ) *Unless there are strong options not to use pickle, then I would recommend this solution.* There is also yet another idea: use Pydantic, however I found that it requires all objects to inherit from its base class - not sure if we should do that, but if you believe it would be useful, then I can check if it's feasible. The only problem I can see is K8s v1Pod objects that are used in multiple places (to be honest I didn't find a good solution to serialize and deserialize them properly to/from JSON). *Tests* I ran some tests to compare performance - using Jarek's testing scenarios <https://github.com/apache/airflow/pull/25094>: Tests were executed on localhost with time airflow internal-api test-client --num-repeats 20 --num-callbacks 20 --test file_processor Results*:* - No Internal API : 104 s - JSON serialization: 119s - Pickle: 115s I exported my changes to PRs: https://github.com/apache/airflow/pull/27468/files https://github.com/apache/airflow/pull/27469/files Any comments appreciated. Best regards, Mateusz Henc
