Repository: incubator-airflow
Updated Branches:
  refs/heads/master c70d8f59c -> 683a27f2c


[AIRFLOW-1907] Pass max_ingestion_time to Druid hook

>From the Druid operator we want to pass the
max_ingestion_time to the
hook since some jobs might take considerably more
time than the others
By default we dont want to set a max ingestion
time.

Closes #2866 from Fokko/AIRFLOW-1907-pass-max-
ingestion-time


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/683a27f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/683a27f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/683a27f2

Branch: refs/heads/master
Commit: 683a27f2c16e036b42226cb9d96012d0616d0aa0
Parents: c70d8f5
Author: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Authored: Tue Dec 12 10:40:46 2017 +0100
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Tue Dec 12 10:40:46 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/druid_operator.py | 8 ++++++--
 airflow/hooks/druid_hook.py                 | 4 ++--
 2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/683a27f2/airflow/contrib/operators/druid_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/druid_operator.py 
b/airflow/contrib/operators/druid_operator.py
index 6978cc3..965dc50 100644
--- a/airflow/contrib/operators/druid_operator.py
+++ b/airflow/contrib/operators/druid_operator.py
@@ -34,10 +34,11 @@ class DruidOperator(BaseOperator):
         self,
         json_index_file,
         druid_ingest_conn_id='druid_ingest_default',
+        max_ingestion_time=None,
         *args, **kwargs):
-
         super(DruidOperator, self).__init__(*args, **kwargs)
         self.conn_id = druid_ingest_conn_id
+        self.max_ingestion_time = max_ingestion_time
 
         with open(json_index_file) as data_file:
             index_spec = json.load(data_file)
@@ -49,6 +50,9 @@ class DruidOperator(BaseOperator):
         )
 
     def execute(self, context):
-        hook = DruidHook(druid_ingest_conn_id=self.conn_id)
+        hook = DruidHook(
+            druid_ingest_conn_id=self.conn_id,
+            max_ingestion_time=self.max_ingestion_time
+        )
         self.log.info("Sumitting %s", self.index_spec_str)
         hook.submit_indexing_job(self.index_spec_str)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/683a27f2/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index 655f666..9ce1f9a 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -36,7 +36,7 @@ class DruidHook(BaseHook):
             self,
             druid_ingest_conn_id='druid_ingest_default',
             timeout=1,
-            max_ingestion_time=18000):
+            max_ingestion_time=None):
 
         self.druid_ingest_conn_id = druid_ingest_conn_id
         self.timeout = timeout
@@ -72,7 +72,7 @@ class DruidHook(BaseHook):
 
             sec = sec + 1
 
-            if sec > self.max_ingestion_time:
+            if self.max_ingestion_time and sec > self.max_ingestion_time:
                 # ensure that the job gets killed if the max ingestion time is 
exceeded
                 requests.post("{0}/{1}/shutdown".format(url, druid_task_id))
                 raise AirflowException('Druid ingestion took more than %s 
seconds', self.max_ingestion_time)

Reply via email to