This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e98e8037d1a Add method to retrieve Druid task status URL based on 
ingestion type (#47238)
e98e8037d1a is described below

commit e98e8037d1aa1675640a0451c9f0cd3c3deed5c5
Author: kanagaraj <[email protected]>
AuthorDate: Tue Mar 4 14:20:20 2025 +0530

    Add method to retrieve Druid task status URL based on ingestion type 
(#47238)
    
    
    ---------
    
    Co-authored-by: k0d04mr <[email protected]>
---
 .../airflow/providers/apache/druid/hooks/druid.py  | 30 ++++++++++++++++++----
 .../tests/unit/apache/druid/hooks/test_druid.py    | 12 +++++++++
 2 files changed, 37 insertions(+), 5 deletions(-)

diff --git 
a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py 
b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py
index c44a6be346e..14899c94ed7 100644
--- a/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py
+++ b/providers/apache/druid/src/airflow/providers/apache/druid/hooks/druid.py
@@ -79,24 +79,44 @@ class DruidHook(BaseHook):
         if self.timeout < 1:
             raise ValueError("Druid timeout should be equal or greater than 1")
 
+        self.status_endpoint = "druid/indexer/v1/task"
+
     @cached_property
     def conn(self) -> Connection:
         return self.get_connection(self.druid_ingest_conn_id)
 
-    def get_conn_url(self, ingestion_type: IngestionType = 
IngestionType.BATCH) -> str:
-        """Get Druid connection url."""
-        host = self.conn.host
-        port = self.conn.port
+    @property
+    def get_connection_type(self) -> str:
         if self.conn.schema:
             conn_type = self.conn.schema
         else:
             conn_type = self.conn.conn_type or "http"
+        return conn_type
+
+    def get_conn_url(self, ingestion_type: IngestionType = 
IngestionType.BATCH) -> str:
+        """Get Druid connection url."""
+        host = self.conn.host
+        port = self.conn.port
+        conn_type = self.get_connection_type
         if ingestion_type == IngestionType.BATCH:
             endpoint = self.conn.extra_dejson.get("endpoint", "")
         else:
             endpoint = self.conn.extra_dejson.get("msq_endpoint", "")
         return f"{conn_type}://{host}:{port}/{endpoint}"
 
+    def get_status_url(self, ingestion_type):
+        """Return Druid status url."""
+        if ingestion_type == IngestionType.MSQ:
+            if self.get_connection_type == "druid":
+                conn_type = self.conn.extra_dejson.get("schema", "http")
+            else:
+                conn_type = self.get_connection_type
+
+            status_endpoint = self.conn.extra_dejson.get("status_endpoint", 
self.status_endpoint)
+            return 
f"{conn_type}://{self.conn.host}:{self.conn.port}/{status_endpoint}"
+        else:
+            return self.get_conn_url(ingestion_type)
+
     def get_auth(self) -> requests.auth.HTTPBasicAuth | None:
         """
         Return username and password from connections tab as 
requests.auth.HTTPBasicAuth object.
@@ -141,7 +161,7 @@ class DruidHook(BaseHook):
             druid_task_id = req_json["task"]
         else:
             druid_task_id = req_json["taskId"]
-        druid_task_status_url = f"{self.get_conn_url()}/{druid_task_id}/status"
+        druid_task_status_url = self.get_status_url(ingestion_type) + 
f"/{druid_task_id}/status"
         self.log.info("Druid indexing task-id: %s", druid_task_id)
 
         running = True
diff --git a/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py 
b/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py
index 5b350904e65..478509480ea 100644
--- a/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py
+++ b/providers/apache/druid/tests/unit/apache/druid/hooks/test_druid.py
@@ -316,6 +316,18 @@ class TestDruidHook:
         hook = DruidHook(timeout=1, max_ingestion_time=5)
         assert hook.get_conn_url(IngestionType.MSQ) == 
"https://test_host:1/sql_ingest";
 
+    
@patch("airflow.providers.apache.druid.hooks.druid.DruidHook.get_connection")
+    def test_get_status_url(self, mock_get_connection):
+        get_conn_value = MagicMock()
+        get_conn_value.host = "test_host"
+        get_conn_value.conn_type = "http"
+        get_conn_value.schema = "https"
+        get_conn_value.port = "1"
+        get_conn_value.extra_dejson = {"endpoint": "ingest", "msq_endpoint": 
"sql_ingest"}
+        mock_get_connection.return_value = get_conn_value
+        hook = DruidHook(timeout=1, max_ingestion_time=5)
+        assert hook.get_status_url(IngestionType.MSQ) == 
"https://test_host:1/druid/indexer/v1/task";
+
     
@patch("airflow.providers.apache.druid.hooks.druid.DruidHook.get_connection")
     def test_get_auth(self, mock_get_connection):
         get_conn_value = MagicMock()

Reply via email to