vincbeck commented on code in PR #40916:
URL: https://github.com/apache/airflow/pull/40916#discussion_r1686761120


##########
airflow/api_internal/internal_api_call.py:
##########
@@ -42,67 +42,57 @@
 class InternalApiConfig:
     """Stores and caches configuration for Internal API."""
 
-    _initialized = False
     _use_internal_api = False
     _internal_api_endpoint = ""
 
     @staticmethod
-    def force_database_direct_access(message: str):
+    def set_use_database_access(component: str):
         """
         Block current component from using Internal API.
 
         All methods decorated with internal_api_call will always be executed 
locally.`
         This mode is needed for "trusted" components like Scheduler, 
Webserver, Internal Api server
         """
-        InternalApiConfig._initialized = True
         InternalApiConfig._use_internal_api = False
-        if _ENABLE_AIP_44:
-            logger.info("Forcing database direct access. %s", message)
+        if not _ENABLE_AIP_44:
+            raise RuntimeError("The AIP_44 is not enabled so you cannot use 
it. ")
+        logger.info(
+            "DB isolation mode. But this is a trusted component and DB 
connection is set. "
+            "Using database direct access when running %s.",
+            component,
+        )
 
     @staticmethod
-    def force_api_access(api_endpoint: str):
-        """
-        Force using Internal API with provided endpoint.
+    def set_use_internal_api(component: str):
+        if not _ENABLE_AIP_44:
+            raise RuntimeError("The AIP_44 is not enabled so you cannot use 
it. ")
+        internal_api_endpoint = ""

Review Comment:
   ```suggestion
   ```



##########
airflow/api_internal/internal_api_call.py:
##########
@@ -42,67 +42,57 @@
 class InternalApiConfig:
     """Stores and caches configuration for Internal API."""
 
-    _initialized = False
     _use_internal_api = False
     _internal_api_endpoint = ""
 
     @staticmethod
-    def force_database_direct_access(message: str):
+    def set_use_database_access(component: str):
         """
         Block current component from using Internal API.
 
         All methods decorated with internal_api_call will always be executed 
locally.`
         This mode is needed for "trusted" components like Scheduler, 
Webserver, Internal Api server
         """
-        InternalApiConfig._initialized = True
         InternalApiConfig._use_internal_api = False
-        if _ENABLE_AIP_44:
-            logger.info("Forcing database direct access. %s", message)
+        if not _ENABLE_AIP_44:
+            raise RuntimeError("The AIP_44 is not enabled so you cannot use 
it. ")
+        logger.info(
+            "DB isolation mode. But this is a trusted component and DB 
connection is set. "
+            "Using database direct access when running %s.",
+            component,
+        )
 
     @staticmethod
-    def force_api_access(api_endpoint: str):
-        """
-        Force using Internal API with provided endpoint.
+    def set_use_internal_api(component: str):
+        if not _ENABLE_AIP_44:
+            raise RuntimeError("The AIP_44 is not enabled so you cannot use 
it. ")
+        internal_api_endpoint = ""
+        internal_api_url = conf.get("core", "internal_api_url")
+        internal_api_endpoint = internal_api_url + "/internal_api/v1/rpcapi"
+        if not internal_api_endpoint.startswith("http://";):
+            raise AirflowConfigException("[core]internal_api_url must start 
with http://";)
+        url_conf = urlparse(conf.get("core", "internal_api_url"))

Review Comment:
   ```suggestion
           url_conf = urlparse(internal_api_url)
   ```



##########
airflow/api_internal/internal_api_call.py:
##########
@@ -42,67 +42,57 @@
 class InternalApiConfig:
     """Stores and caches configuration for Internal API."""
 
-    _initialized = False
     _use_internal_api = False
     _internal_api_endpoint = ""
 
     @staticmethod
-    def force_database_direct_access(message: str):
+    def set_use_database_access(component: str):
         """
         Block current component from using Internal API.
 
         All methods decorated with internal_api_call will always be executed 
locally.`
         This mode is needed for "trusted" components like Scheduler, 
Webserver, Internal Api server
         """
-        InternalApiConfig._initialized = True
         InternalApiConfig._use_internal_api = False
-        if _ENABLE_AIP_44:
-            logger.info("Forcing database direct access. %s", message)
+        if not _ENABLE_AIP_44:
+            raise RuntimeError("The AIP_44 is not enabled so you cannot use 
it. ")
+        logger.info(
+            "DB isolation mode. But this is a trusted component and DB 
connection is set. "
+            "Using database direct access when running %s.",
+            component,
+        )
 
     @staticmethod
-    def force_api_access(api_endpoint: str):
-        """
-        Force using Internal API with provided endpoint.
+    def set_use_internal_api(component: str):
+        if not _ENABLE_AIP_44:
+            raise RuntimeError("The AIP_44 is not enabled so you cannot use 
it. ")
+        internal_api_endpoint = ""
+        internal_api_url = conf.get("core", "internal_api_url")
+        internal_api_endpoint = internal_api_url + "/internal_api/v1/rpcapi"
+        if not internal_api_endpoint.startswith("http://";):
+            raise AirflowConfigException("[core]internal_api_url must start 
with http://";)
+        url_conf = urlparse(conf.get("core", "internal_api_url"))
+        api_path = url_conf.path
+        if api_path in ["", "/"]:
+            # Add the default path if not given in the configuration
+            api_path = "/internal_api/v1/rpcapi"
+        if url_conf.scheme not in ["http", "https"]:
+            raise AirflowConfigException("[core]internal_api_url must start 
with http:// or https://";)
+        internal_api_endpoint = 
f"{url_conf.scheme}://{url_conf.netloc}{api_path}"

Review Comment:
   `internal_api_endpoint` is already defined 



##########
.github/workflows/ci.yml:
##########
@@ -173,6 +173,7 @@ jobs:
       skip-pre-commits: ${{needs.build-info.outputs.skip-pre-commits}}
       canary-run: ${{needs.build-info.outputs.canary-run}}
       latest-versions-only: ${{needs.build-info.outputs.latest-versions-only}}
+      enable-aip-44: "true"

Review Comment:
   Which means we are going to run tests in DB isolation mode only? Should we 
run them in both mode?



##########
airflow/api_internal/internal_api_call.py:
##########
@@ -42,67 +42,57 @@
 class InternalApiConfig:
     """Stores and caches configuration for Internal API."""
 
-    _initialized = False
     _use_internal_api = False
     _internal_api_endpoint = ""
 
     @staticmethod
-    def force_database_direct_access(message: str):
+    def set_use_database_access(component: str):
         """
         Block current component from using Internal API.
 
         All methods decorated with internal_api_call will always be executed 
locally.`
         This mode is needed for "trusted" components like Scheduler, 
Webserver, Internal Api server
         """
-        InternalApiConfig._initialized = True
         InternalApiConfig._use_internal_api = False
-        if _ENABLE_AIP_44:
-            logger.info("Forcing database direct access. %s", message)
+        if not _ENABLE_AIP_44:
+            raise RuntimeError("The AIP_44 is not enabled so you cannot use 
it. ")
+        logger.info(
+            "DB isolation mode. But this is a trusted component and DB 
connection is set. "
+            "Using database direct access when running %s.",
+            component,
+        )
 
     @staticmethod
-    def force_api_access(api_endpoint: str):
-        """
-        Force using Internal API with provided endpoint.
+    def set_use_internal_api(component: str):
+        if not _ENABLE_AIP_44:
+            raise RuntimeError("The AIP_44 is not enabled so you cannot use 
it. ")
+        internal_api_endpoint = ""
+        internal_api_url = conf.get("core", "internal_api_url")
+        internal_api_endpoint = internal_api_url + "/internal_api/v1/rpcapi"
+        if not internal_api_endpoint.startswith("http://";):
+            raise AirflowConfigException("[core]internal_api_url must start 
with http://";)
+        url_conf = urlparse(conf.get("core", "internal_api_url"))
+        api_path = url_conf.path
+        if api_path in ["", "/"]:
+            # Add the default path if not given in the configuration
+            api_path = "/internal_api/v1/rpcapi"
+        if url_conf.scheme not in ["http", "https"]:
+            raise AirflowConfigException("[core]internal_api_url must start 
with http:// or https://";)

Review Comment:
   There is an inconsistency between this check and the one on line 72



##########
airflow/cli/commands/internal_api_command.py:
##########
@@ -222,7 +222,8 @@ def create_app(config=None, testing=False):
     if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config:
         flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = 
settings.prepare_engine_args()
 
-    InternalApiConfig.force_database_direct_access("Gunicorn worker 
initialization")
+    if conf.getboolean("core", "database_access_isolation", fallback=False):
+        InternalApiConfig.set_use_database_access("Gunicorn worker 
initialization")

Review Comment:
   Should we fail this command if `database_access_isolation` is `False`? The 
user is trying to run the internal API component but 
`database_access_isolation` is `False`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to