potiuk commented on code in PR #27892:
URL: https://github.com/apache/airflow/pull/27892#discussion_r1040941266


##########
airflow/config_templates/config.yml:
##########
@@ -418,6 +418,18 @@
       type: string
       default: ~
       example: '{"some_param": "some_value"}'
+    - name: database_access_isolation
+      description: Whether components should use Airflow Internal API for DB 
connectivity.

Review Comment:
   Could we please add `(experimental)` in the description of the options? I 
think it will get us some time to get this "prime-time ready" and even then for 
a while we should make sure this feature is experimental to get the feedback 
and be able to adapt it to findings from bigger, regular customers (same with 
the OpenAPI specs - experimental status should be very clear everywhere when 
end-user is involved).



##########
airflow/api_internal/internal_api_call.py:
##########
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import inspect
+import json
+from functools import wraps
+from typing import Callable, TypeVar
+
+import requests
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException, AirflowException
+from airflow.serialization.serialized_objects import BaseSerialization
+from airflow.typing_compat import ParamSpec
+
+PS = ParamSpec("PS")
+RT = TypeVar("RT")
+
+
+class InternalApiConfig:
+    """Stores and caches configuration for Internal API."""
+
+    _initialized = False
+    _use_internal_api = False
+    _internal_api_endpoint = ""
+
+    @staticmethod
+    def get_use_internal_api():
+        if not InternalApiConfig._initialized:
+            InternalApiConfig._init_values()
+        return InternalApiConfig._use_internal_api
+
+    @staticmethod
+    def get_internal_api_endpoint():
+        if not InternalApiConfig._initialized:
+            InternalApiConfig._init_values()
+        return InternalApiConfig._internal_api_endpoint
+
+    @staticmethod
+    def _init_values():
+        use_internal_api = conf.getboolean("core", "database_access_isolation")
+        internal_api_endpoint = ""
+        if use_internal_api:
+            internal_api_url = conf.get("core", "database_api_url")
+            internal_api_endpoint = internal_api_url + "/internal/v1/rpcapi"
+            if not internal_api_endpoint.startswith("http://";):
+                raise AirflowConfigException("[core]database_api_url must 
start with http://";)
+
+        InternalApiConfig._initialized = True
+        InternalApiConfig._use_internal_api = use_internal_api
+        InternalApiConfig._internal_api_endpoint = internal_api_endpoint
+
+
+def internal_api_call(func: Callable[PS, RT | None]) -> Callable[PS, RT | 
None]:
+    """Decorator for methods which may be executed in database isolation mode.
+
+    If [core]database_access_isolation is true then such method are not 
executed locally,
+    but instead RPC call is made to Database API (aka Internal API). This 
makes some components
+    decouple from direct Airflow database access.
+    Each decorated method must be present in METHODS list in 
airflow.api_internal.endpoints.rpc_api_endpoint.

Review Comment:
   We could add a pre-commit to check this, but I can add it as a follow-up 
after this PR is merged.



##########
airflow/api_internal/openapi/internal_api_v1.yaml:
##########
@@ -0,0 +1,92 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+openapi: 3.0.2
+info:
+  title: Airflow Internal API
+  version: 1.0.0
+  description: |
+    This is Airflow Internal API - which is a proxy for components running
+    customer code for connecting to Airflow Database.
+
+    It is not intended to be used by any external code.
+
+    You can find more information in AIP-44
+    
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
+
+
+servers:
+  - url: /internal/v1
+    description: Airflow Internal API
+paths:
+  "/rpcapi":
+    post:
+      operationId: rpcapi
+      deprecated: false
+      x-openapi-router-controller: 
airflow.api_internal.endpoints.rpc_api_endpoint
+      operationId: json_rpc
+      tags:
+      - JSONRPC
+      parameters: []
+      responses:
+        '200':
+          description: Successful response
+      requestBody:
+        x-body-name: body
+        required: true
+        content:
+          application/json:
+            schema:
+              type: object
+              required:
+              - method
+              - jsonrpc
+              - params
+              properties:
+                jsonrpc:
+                  type: string
+                  default: '2.0'
+                  description: JSON-RPC Version (2.0)
+                method:
+                  type: string
+                  description: Method name
+                params:
+                  title: Parameters
+                  type: string
+x-headers: []
+x-explorer-enabled: true
+x-proxy-enabled: true
+x-samples-enabled: true
+components:
+  schemas:
+    JsonRpcRequired:

Review Comment:
   This is the way how Open API specification describes it conformnce with 
JsonRPC. We should have it in. Various tools that might be later employed might 
use the information (swagger UI for example to generate the documentation and 
describe the API).
   
   This is not crucial in our case, the Swagger UI and other OpenAPI artifacts 
are really "side-efffect" here - we have no-one to consume those artifacts as 
our API is purely internal and most of it is not really "fixed" and described 
(i.e. the actual methods and parameters are not really validated/processed by 
the API specification  - because they are dynamically generated  on both client 
and server side - so this is not actually very needed, but might be useful in 
the future in case we will do any kind of additional tooling that might rely on 
inspecting the specification and using the "metadata" about JsonRPC under the 
hood.
   
   BTW. This is perfectly OK we do not have those methods and parameters 
described. This is not the "usual" API that you expose and document. Both 
client and server in this communication are guaranteed to have single source of 
truth (inspection of the method names and their parameters and serializing them 
using our own serializer). 
   
   So we are good here. The Open API specification we have looks cool:
   
   * single endpoint
   * JSonRpc conformance
   * no boiler-plate growing with every method added
   * single source of truth for the actual "schema" of passed data
   * the data is easy to inspect by human without extra tools (method names, 
and parameters send as JSON-serialized data).
   * guaranteed client-server compatibility
   
   I really like it :) 
   



##########
airflow/api_internal/openapi/internal_api_v1.yaml:
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+openapi: 3.0.2
+info:
+  title: Airflow Internal API
+  version: 1.0.0
+  description: |
+    This is Airflow Internal API - which is a proxy for components running
+    customer code for connecting to Airflow Database.
+
+    It is not intended to be used by any external code.
+
+    You can find more information in AIP-44
+    
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
+
+
+servers:
+  - url: /internal/v1
+    description: Airflow Internal API
+paths:
+  "/rpcapi":
+    post:
+      operationId: rpcapi
+      deprecated: false
+      x-openapi-router-controller: 
airflow.api_internal.endpoints.rpc_api_endpoint
+      operationId: json_rpc

Review Comment:
   The primary use for operationId is for code generators to generate method 
names. In our case, when we dynamically generate "actual" method names and 
parameters and pass them via json-rpc construct, it does not really matter 
(there will always be a single method to call) but I agree with @vincbeck that 
"json_rpc" is poor name here:. 
   
   How about `internal_airflow_api`



##########
airflow/www/extensions/init_views.py:
##########
@@ -220,6 +220,27 @@ def _handle_method_not_allowed(ex):
     app.extensions["csrf"].exempt(api_bp)
 
 
+def init_api_internal(app: Flask) -> None:
+    """Initialize Internal API"""
+    if not conf.getboolean("webserver", "run_internal_api", fallback=False):
+        return
+    base_path = "/internal/v1"

Review Comment:
   I have a feeling this should be a bit more descriptive URL: 
`/internal_api/v1` ? - same as open api specification basically 
(`internal_api_v1.yaml`),



##########
airflow/config_templates/config.yml:
##########
@@ -418,6 +418,18 @@
       type: string
       default: ~
       example: '{"some_param": "some_value"}'
+    - name: database_access_isolation
+      description: Whether components should use Airflow Internal API for DB 
connectivity.
+      version_added: 2.6.0
+      type: boolean
+      example: ~
+      default: "False"
+    - name: database_api_url

Review Comment:
   Yes. But this is also temporty (we need to be aware this is not yet 
"production" ready setup. I imagine in the final version we should be able to 
to talk to one of many  instances of  internal API server. I can very easily 
imagine we have 1000s of  workers and 10 different webservers (or just internal 
api servers) they talk to.
   
   Eventually we will have to figure out a possible way how this can be done by 
following the approach of airflow that each component has to have the same 
configuration of Airflow. 
   
    We have not yet discussed how to do this (could be a random selection among 
multiple hosts as a simple approach to start with for example, or we could add 
some smarts to be able to reconnect connections when one server becomes to 
busy. Or it can be done via external loadbalancing.
   
   But I think we should have the ability to specify multiple hosts here and in 
case we have more than one, connecting to randomly chosen one out of the list 
would be the right implementation for now, I think
   
   
   



##########
tests/api_internal/test_internal_api_call.py:
##########
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+from __future__ import annotations
+
+import json
+import unittest
+from unittest import mock
+
+import requests
+
+from airflow.api_internal.internal_api_call import InternalApiConfig, 
internal_api_call
+from airflow.serialization.serialized_objects import BaseSerialization
+from tests.test_utils.config import conf_vars
+
+
+class TestInternalApiConfig(unittest.TestCase):
+    def setUp(self):
+        InternalApiConfig._initialized = False
+
+    @conf_vars(
+        {
+            ("core", "database_access_isolation"): "false",
+            ("core", "database_api_url"): "http://localhost:8888";,
+        }
+    )
+    def test_get_use_internal_api_disabled(self):
+        self.assertFalse(InternalApiConfig.get_use_internal_api())
+
+    @conf_vars(
+        {
+            ("core", "database_access_isolation"): "true",
+            ("core", "database_api_url"): "http://localhost:8888";,
+        }
+    )
+    def test_get_use_internal_api_enabled(self):
+        self.assertTrue(InternalApiConfig.get_use_internal_api())
+        self.assertEqual(
+            InternalApiConfig.get_internal_api_endpoint(),
+            "http://localhost:8888/internal/v1/rpcapi";,
+        )
+
+
+@internal_api_call
+def fake_method() -> str:
+    return "local-call"
+
+
+@internal_api_call
+def fake_method_with_params(dag_id: str, task_id: int) -> str:
+    return f"local-call-with-params-{dag_id}-{task_id}"
+
+
+class TestInternalApiCall(unittest.TestCase):
+    def setUp(self):
+        InternalApiConfig._initialized = False
+
+    @conf_vars(
+        {
+            ("core", "database_access_isolation"): "false",
+            ("core", "database_api_url"): "http://localhost:8888";,
+        }
+    )
+    @mock.patch("airflow.api_internal.internal_api_call.requests")
+    def test_local_call(self, mock_requests):
+        result = fake_method()
+
+        self.assertEqual(result, "local-call")
+        mock_requests.post.assert_not_called()
+
+    @conf_vars(
+        {
+            ("core", "database_access_isolation"): "true",
+            ("core", "database_api_url"): "http://localhost:8888";,
+        }
+    )
+    @mock.patch("airflow.api_internal.internal_api_call.requests")
+    def test_remote_call(self, mock_requests):
+        response = requests.Response()
+        response.status_code = 200
+
+        response._content = 
json.dumps(BaseSerialization.serialize("remote-call"))
+
+        mock_requests.post.return_value = response
+
+        result = fake_method()
+        self.assertEqual(result, "remote-call")
+        expected_data = json.dumps(
+            {
+                "jsonrpc": "2.0",
+                "method": 
"tests.api_internal.test_internal_api_call.fake_method",
+                "params": '{"__var": {}, "__type": "dict"}',
+            }
+        )
+        mock_requests.post.assert_called_once_with(
+            url="http://localhost:8888/internal/v1/rpcapi";,
+            data=expected_data,
+            headers={"Content-Type": "application/json"},
+        )
+
+    @conf_vars(
+        {
+            ("core", "database_access_isolation"): "true",
+            ("core", "database_api_url"): "http://localhost:8888";,
+        }
+    )
+    @mock.patch("airflow.api_internal.internal_api_call.requests")
+    def test_remote_call_with_params(self, mock_requests):

Review Comment:
   Yeah. let's keep them parameters. Sometimes coupling costs are bigger than 
DRY benefits (especially for tests DA DAMP is better than DRY).



##########
airflow/api_internal/internal_api_call.py:
##########
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import inspect
+import json
+from functools import wraps
+from typing import Callable, TypeVar
+
+import requests
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowConfigException, AirflowException
+from airflow.serialization.serialized_objects import BaseSerialization
+from airflow.typing_compat import ParamSpec
+
+PS = ParamSpec("PS")
+RT = TypeVar("RT")
+
+
+class InternalApiConfig:
+    """Stores and caches configuration for Internal API."""
+
+    _initialized = False
+    _use_internal_api = False
+    _internal_api_endpoint = ""
+
+    @staticmethod
+    def get_use_internal_api():
+        if not InternalApiConfig._initialized:
+            InternalApiConfig._init_values()
+        return InternalApiConfig._use_internal_api
+
+    @staticmethod
+    def get_internal_api_endpoint():
+        if not InternalApiConfig._initialized:
+            InternalApiConfig._init_values()
+        return InternalApiConfig._internal_api_endpoint
+
+    @staticmethod
+    def _init_values():
+        use_internal_api = conf.getboolean("core", "database_access_isolation")
+        internal_api_endpoint = ""
+        if use_internal_api:
+            internal_api_url = conf.get("core", "database_api_url")
+            internal_api_endpoint = internal_api_url + "/internal/v1/rpcapi"
+            if not internal_api_endpoint.startswith("http://";):
+                raise AirflowConfigException("[core]database_api_url must 
start with http://";)
+
+        InternalApiConfig._initialized = True
+        InternalApiConfig._use_internal_api = use_internal_api
+        InternalApiConfig._internal_api_endpoint = internal_api_endpoint
+
+
+def internal_api_call(func: Callable[PS, RT | None]) -> Callable[PS, RT | 
None]:
+    """Decorator for methods which may be executed in database isolation mode.
+
+    If [core]database_access_isolation is true then such method are not 
executed locally,
+    but instead RPC call is made to Database API (aka Internal API). This 
makes some components
+    stop depending on Airflow database access.
+    Each decorated method must be present in METHODS list in 
airflow.api_internal.endpoints.rpc_api_endpoint.
+    Only static methods can be decorated. This decorator must be before 
"provide_session".
+
+    See AIP-44 for more information.

Review Comment:
   Link here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to