vincbeck commented on code in PR #27892:
URL: https://github.com/apache/airflow/pull/27892#discussion_r1032587457


##########
airflow/api_internal/endpoints/rpc_api_endpoint.py:
##########
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+
+from flask import Response
+
+from airflow.api_connexion.types import APIResponse
+from airflow.dag_processing.processor import DagFileProcessor
+from airflow.serialization.serialized_objects import BaseSerialization
+
+log = logging.getLogger(__name__)
+
+METHODS = {
+    "dag_processing.processor.update_import_errors": 
DagFileProcessor.update_import_errors,
+}
+
+
+def json_rpc(
+    body: dict,
+) -> APIResponse:
+    """Handler for Internal API /internal/v1/rpcapi endpoint."""
+    log.debug("Got request")
+    json_rpc = body.get("jsonrpc")
+    if json_rpc != "2.0":
+        log.warning("Not jsonrpc-2.0 request")
+        return Response(response="Expected jsonrpc 2.0 request.", status=400)
+
+    method_name = str(body.get("method"))
+    if method_name not in METHODS:
+        log.warning("Unrecognized method: %", method_name)
+        return Response(response=f"Unrecognized method: {method_name}", 
status=400)
+
+    params_json = body.get("params")
+    if not params_json:
+        params_json = "{}"
+    handler = METHODS[method_name]
+    try:
+        params = BaseSerialization.deserialize(json.loads(params_json))
+    except Exception as err:
+        log.warning("Error deserializing parameters.")
+        log.warning(err)

Review Comment:
   nit. Should not it be an error?



##########
airflow/api_internal/endpoints/rpc_api_endpoint.py:
##########
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+
+from flask import Response
+
+from airflow.api_connexion.types import APIResponse
+from airflow.dag_processing.processor import DagFileProcessor
+from airflow.serialization.serialized_objects import BaseSerialization
+
+log = logging.getLogger(__name__)
+
+METHODS = {
+    "dag_processing.processor.update_import_errors": 
DagFileProcessor.update_import_errors,
+}
+
+
+def json_rpc(
+    body: dict,
+) -> APIResponse:
+    """Handler for Internal API /internal/v1/rpcapi endpoint."""
+    log.debug("Got request")
+    json_rpc = body.get("jsonrpc")
+    if json_rpc != "2.0":
+        log.warning("Not jsonrpc-2.0 request")
+        return Response(response="Expected jsonrpc 2.0 request.", status=400)
+
+    method_name = str(body.get("method"))
+    if method_name not in METHODS:
+        log.warning("Unrecognized method: %", method_name)

Review Comment:
   nit. Should not it be an error?



##########
airflow/api_internal/endpoints/rpc_api_endpoint.py:
##########
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+
+from flask import Response
+
+from airflow.api_connexion.types import APIResponse
+from airflow.dag_processing.processor import DagFileProcessor
+from airflow.serialization.serialized_objects import BaseSerialization
+
+log = logging.getLogger(__name__)
+
+METHODS = {
+    "dag_processing.processor.update_import_errors": 
DagFileProcessor.update_import_errors,
+}
+
+
+def json_rpc(
+    body: dict,
+) -> APIResponse:
+    """Handler for Internal API /internal/v1/rpcapi endpoint."""
+    log.debug("Got request")
+    json_rpc = body.get("jsonrpc")
+    if json_rpc != "2.0":
+        log.warning("Not jsonrpc-2.0 request")
+        return Response(response="Expected jsonrpc 2.0 request.", status=400)
+
+    method_name = str(body.get("method"))
+    if method_name not in METHODS:
+        log.warning("Unrecognized method: %", method_name)
+        return Response(response=f"Unrecognized method: {method_name}", 
status=400)
+
+    params_json = body.get("params")
+    if not params_json:
+        params_json = "{}"
+    handler = METHODS[method_name]
+    try:
+        params = BaseSerialization.deserialize(json.loads(params_json))
+    except Exception as err:
+        log.warning("Error deserializing parameters.")
+        log.warning(err)
+        return Response(response="Error deserializing parameters.", status=400)
+
+    log.debug("Calling method %.", {method_name})
+    handler = METHODS[method_name]
+    output = handler(**params)

Review Comment:
   What if this function raise an exception? What is the behavior here? Should 
we catch any exception and, in such case, return a `Response` with `status=500` 
or it comes out of the box?



##########
airflow/api_internal/endpoints/rpc_api_endpoint.py:
##########
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+
+from flask import Response
+
+from airflow.api_connexion.types import APIResponse
+from airflow.dag_processing.processor import DagFileProcessor
+from airflow.serialization.serialized_objects import BaseSerialization
+
+log = logging.getLogger(__name__)
+
+METHODS = {
+    "dag_processing.processor.update_import_errors": 
DagFileProcessor.update_import_errors,
+}
+
+
+def json_rpc(
+    body: dict,
+) -> APIResponse:
+    """Handler for Internal API /internal/v1/rpcapi endpoint."""
+    log.debug("Got request")
+    json_rpc = body.get("jsonrpc")
+    if json_rpc != "2.0":
+        log.warning("Not jsonrpc-2.0 request")
+        return Response(response="Expected jsonrpc 2.0 request.", status=400)
+
+    method_name = str(body.get("method"))
+    if method_name not in METHODS:
+        log.warning("Unrecognized method: %", method_name)
+        return Response(response=f"Unrecognized method: {method_name}", 
status=400)
+
+    params_json = body.get("params")
+    if not params_json:
+        params_json = "{}"

Review Comment:
   ```suggestion
       params_json = body.get("params", "{}")
   ```



##########
airflow/dag_processing/processor.py:
##########
@@ -522,16 +522,20 @@ def manage_slas(self, dag: DAG, session: Session = None) 
-> None:
             session.commit()
 
     @staticmethod
-    def update_import_errors(session: Session, dagbag: DagBag) -> None:
+    @internal_api_call("dag_processing.processor.update_import_errors")

Review Comment:
   The string `dag_processing.processor.update_import_errors` is redundant with 
the method name. From `internal_api_call` definition, is not it possible to 
extract this with something like `func.__module__`?



##########
airflow/www/app.py:
##########
@@ -148,7 +149,9 @@ def create_app(config=None, testing=False):
         init_plugins(flask_app)
         init_connection_form()
         init_error_handlers(flask_app)
-        init_api_connexion(flask_app)
+        if conf.get("webserver", "run_internal_api"):
+            init_api_connexion(flask_app)
+        init_api_internal(flask_app)

Review Comment:
   I am not sure what you tried to achieve here but it seems you are disabling 
the Rest API if `run_internal_api` is `False`
   ```suggestion
           if conf.get("webserver", "run_internal_api"):
               init_api_internal(flask_app)
           init_api_connexion(flask_app)
   ```



##########
airflow/api_internal/endpoints/rpc_api_endpoint.py:
##########
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+
+from flask import Response
+
+from airflow.api_connexion.types import APIResponse
+from airflow.dag_processing.processor import DagFileProcessor
+from airflow.serialization.serialized_objects import BaseSerialization
+
+log = logging.getLogger(__name__)
+
+METHODS = {
+    "dag_processing.processor.update_import_errors": 
DagFileProcessor.update_import_errors,
+}
+
+
+def json_rpc(
+    body: dict,
+) -> APIResponse:
+    """Handler for Internal API /internal/v1/rpcapi endpoint."""
+    log.debug("Got request")
+    json_rpc = body.get("jsonrpc")
+    if json_rpc != "2.0":
+        log.warning("Not jsonrpc-2.0 request")
+        return Response(response="Expected jsonrpc 2.0 request.", status=400)
+
+    method_name = str(body.get("method"))
+    if method_name not in METHODS:
+        log.warning("Unrecognized method: %", method_name)
+        return Response(response=f"Unrecognized method: {method_name}", 
status=400)
+
+    params_json = body.get("params")
+    if not params_json:
+        params_json = "{}"
+    handler = METHODS[method_name]
+    try:
+        params = BaseSerialization.deserialize(json.loads(params_json))
+    except Exception as err:
+        log.warning("Error deserializing parameters.")
+        log.warning(err)
+        return Response(response="Error deserializing parameters.", status=400)
+
+    log.debug("Calling method %.", {method_name})
+    handler = METHODS[method_name]
+    output = handler(**params)
+    if output:
+        output_json = BaseSerialization.serialize(json.dumps(output))
+    else:
+        output_json = ""
+    log.debug("Returning response")
+    return Response(response=str(output_json), headers={"Content-Type": 
"application/json"})

Review Comment:
   ```suggestion
       output_json = BaseSerialization.serialize(json.dumps(output))
       log.debug("Returning response")
       return Response(response=str(output_json or ''), 
headers={"Content-Type": "application/json"})
   ```
   
   Is empty string a valid JSON object? I am wondering if we should not return 
an empty object `{}` if `output_json` is `None`



##########
airflow/api_internal/endpoints/rpc_api_endpoint.py:
##########
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+
+from flask import Response
+
+from airflow.api_connexion.types import APIResponse
+from airflow.dag_processing.processor import DagFileProcessor
+from airflow.serialization.serialized_objects import BaseSerialization
+
+log = logging.getLogger(__name__)
+
+METHODS = {
+    "dag_processing.processor.update_import_errors": 
DagFileProcessor.update_import_errors,
+}
+
+
+def json_rpc(
+    body: dict,
+) -> APIResponse:
+    """Handler for Internal API /internal/v1/rpcapi endpoint."""
+    log.debug("Got request")
+    json_rpc = body.get("jsonrpc")
+    if json_rpc != "2.0":
+        log.warning("Not jsonrpc-2.0 request")

Review Comment:
   nit. Should not it be an error?



##########
airflow/api_internal/openapi/internal_api_v1.yaml:
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+openapi: 3.0.2
+info:
+  title: Airflow Internal API
+  version: 1.0.0
+  description: |
+    This is Airflow Internal API - which is a proxy for components running
+    customer code for connecting to Airflow Database.
+
+    It is not intended to be used by any external code.
+
+    You can find more information in AIP-44
+    
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
+
+
+servers:
+  - url: /internal/v1
+    description: Airflow Internal API
+paths:
+  "/rpcapi":
+    post:
+      operationId: rpcapi
+      deprecated: false
+      x-openapi-router-controller: 
airflow.api_internal.endpoints.rpc_api_endpoint
+      operationId: json_rpc

Review Comment:
   I am not an expert in `openapi` but from the 
[doc](https://swagger.io/docs/specification/paths-and-operations/) it seems 
this field is used to identity an operation be unique across all operations. 
While this is true because we have only one operation, I dont think `json_rpc` 
is. very good identifier for the operation (since the value will always be 
`2.0`). From the doc it says this field is optional, should not we drop it then?



##########
airflow/api_internal/openapi/internal_api_v1.yaml:
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+openapi: 3.0.2
+info:
+  title: Airflow Internal API
+  version: 1.0.0
+  description: |
+    This is Airflow Internal API - which is a proxy for components running
+    customer code for connecting to Airflow Database.
+
+    It is not intended to be used by any external code.
+
+    You can find more information in AIP-44
+    
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
+
+
+servers:
+  - url: /internal/v1
+    description: Airflow Internal API
+paths:
+  "/rpcapi":
+    post:
+      operationId: rpcapi
+      deprecated: false
+      x-openapi-router-controller: 
airflow.api_internal.endpoints.rpc_api_endpoint
+      operationId: json_rpc
+      tags:
+      - JSONRPC
+      parameters: []
+      responses:
+        '200':
+          description: Successful response
+      requestBody:
+        x-body-name: body
+        required: true
+        content:
+          application/json:
+            schema:
+              type: object
+              required:
+              - method
+              - jsonrpc
+              - params
+              properties:
+                jsonrpc:
+                  type: string
+                  default: '2.0'
+                  description: JSON-RPC Version (2.0)
+                method:
+                  type: string
+                  description: Method name
+                params:
+                  title: Parameters
+                  type: string
+x-headers: []
+x-explorer-enabled: true
+x-proxy-enabled: true
+x-samples-enabled: true
+components:
+  schemas:
+    JsonRpcRequired:
+      type: object
+      required:
+      - method
+      - jsonrpc
+      properties:
+        method:
+          type: string
+          description: Method name
+        jsonrpc:
+          type: string
+          default: '2.0'
+          description: JSON-RPC Version (2.0)
+      discriminator:
+        propertyName: method_name
+    examplePost:

Review Comment:
   This one seems not used. What is the purpose here?



##########
airflow/api_internal/internal_api_call.py:
##########
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import inspect
+import json
+from typing import Callable, TypeVar
+
+import requests
+
+from airflow.configuration import conf
+from airflow.exceptions import AirflowException
+from airflow.serialization.serialized_objects import BaseSerialization
+from airflow.typing_compat import ParamSpec
+
+PS = ParamSpec("PS")
+RT = TypeVar("RT")
+
+_use_internal_api = conf.get("core", "database_access_isolation")
+_internal_api_url = conf.get("core", "database_api_url")
+
+_internal_api_endpoint = _internal_api_url + "/internal/v1/rpcapi"
+if not _internal_api_endpoint.startswith("http://";):
+    _internal_api_endpoint = "http://"; + _internal_api_endpoint

Review Comment:
   Should not we raise an exception in such case? I am scared that if you make 
a typo in `database_api_url` and for instance set the value as `http//......` 
(the `:` is missing), you end up having `_internal_api_endpoint` set to 
`http://http//.......`. I rather be strict on the data format and complain if 
it does not fit the schema than trying to fix it and end up in the worst 
situation than it is originally



##########
airflow/config_templates/default_airflow.cfg:
##########
@@ -739,6 +746,9 @@ audit_view_excluded_events = 
gantt,landing_times,tries,duration,calendar,graph,g
 # Example: audit_view_included_events = dagrun_cleared,failed
 # audit_view_included_events =
 
+# Boolean for for running Internal API in the webserver.

Review Comment:
   ```suggestion
   # Boolean for running Internal API in the webserver.
   ```



##########
airflow/config_templates/config.yml:
##########
@@ -1462,6 +1474,13 @@
       type: string
       example: "dagrun_cleared,failed"
       default: ~
+    - name: run_internal_api
+      description: |
+        Boolean for for running Internal API in the webserver.

Review Comment:
   ```suggestion
           Boolean for running Internal API in the webserver.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to