gopidesupavan commented on code in PR #41511:
URL: https://github.com/apache/airflow/pull/41511#discussion_r1719969696


##########
airflow/providers/amazon/aws/operators/athena.py:
##########
@@ -126,63 +130,97 @@ def execute(self, context: Context) -> str | None:
         self.query_execution_context["Catalog"] = self.catalog
         if self.output_location:
             self.result_configuration["OutputLocation"] = self.output_location
-        self.query_execution_id = self.hook.run_query(
-            self.query,
-            self.query_execution_context,
-            self.result_configuration,
-            self.client_request_token,
-            self.workgroup,
-        )
-        AthenaQueryResultsLink.persist(
-            context=context,
-            operator=self,
-            region_name=self.hook.conn_region_name,
-            aws_partition=self.hook.conn_partition,
-            query_execution_id=self.query_execution_id,
-        )
 
-        if self.deferrable:
-            self.defer(
-                trigger=AthenaTrigger(
-                    query_execution_id=self.query_execution_id,
-                    waiter_delay=self.sleep_time,
-                    waiter_max_attempts=self.max_polling_attempts,
-                    aws_conn_id=self.aws_conn_id,
-                    region_name=self.region_name,
-                    verify=self.verify,
-                    botocore_config=self.botocore_config,
-                ),
-                method_name="execute_complete",
+        if isinstance(self.query, str):
+            if self.split_statements:
+                query_list = self._split_sql_string(self.query)
+            else:
+                query_list = [self.query] if self.query.strip() else []
+        else:
+            query_list = self.query
+
+        query_list_len = len(query_list)
+
+        if not query_list_len:
+            raise AirflowException("No queries were found to execute.")
+
+        for query in query_list:

Review Comment:
   I believe this can be changed to run in parallel? or is there any reason to 
run here sequentially?. If a user submits around 10 queries and each takes an 
average of 3 minutes, we end up blocking the worker for 30 minutes. with all 
the queries running in parallel, you can monitor their execution IDs with in a 
deferrable state. good thing is Athena have `batch_get_query_execution` api 
which gives their status of the query(it has limitation upto 50 but this can be 
handled easily).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to