patrickmckenna commented on a change in pull request #2206: [AIRFLOW-922] 
Update PrestoHook to enable synchronous execution
URL: https://github.com/apache/airflow/pull/2206#discussion_r252914208
 
 

 ##########
 File path: airflow/hooks/presto_hook.py
 ##########
 @@ -117,11 +118,49 @@ def get_pandas_df(self, hql, parameters=None):
             df = pandas.DataFrame()
         return df
 
-    def run(self, hql, parameters=None):
+    def run(self, sql, parameters=None, poll_interval=None):
         """
-        Execute the statement against Presto. Can be used to create views.
+        Execute statement(s) against Presto. By default, statements are
+        executed asynchronously. To execute each synchronously, pass a non-None
+        poll_interval.
+
+        :param sql: the statement(s) to be executed
+        :type sql: str or iterable
+        :param parameters: the parameters to render the statement(s) with
+        :type parameters: mapping or iterable
+        :param poll_interval: how often, in seconds, to check the execution
+            status of each statement; set to None
+        :type poll_interval: int or float
         """
-        return super(PrestoHook, self).run(self._strip_sql(hql), parameters)
+        if isinstance(sql, str):
+            sql = [sql]
+
+        cursor = self.get_conn().cursor()
+
+        for stmt in sql:
+            stmt = self._strip_sql(stmt)
+            self.log.info("{} with parameters {}".format(stmt, parameters))
+            cursor.execute(stmt, parameters)
+
+            if poll_interval is not None:
+                while not self.execution_finished(cursor):
+                    time.sleep(poll_interval)
+
+    def execution_finished(self, cursor):
+        """
+        Return a bool indicating whether the latest statement executed by
+        cursor has finished executing. If the execution status can't be
+        determined, e.g. because of a network problem, returns None.
+
+        :param cursor: a cursor
+        :type cursor: presto.Cursor
+        """
+        try:
+            return cursor.poll() is None
+        except Exception as ex:
+            msg = "Couldn't determine statement execution status: ".format(ex)
+            self.log.error(msg)
 
 Review comment:
   @mik-laj ah, is that the agreed upon style preference? Happy to change it, 
just wasn't aware (didn't see anything in the docs or linting tests enforcing 
that, but may very well have missed it 😄).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to