Repository: incubator-airflow Updated Branches: refs/heads/master c70d8f59c -> 683a27f2c
[AIRFLOW-1907] Pass max_ingestion_time to Druid hook >From the Druid operator we want to pass the max_ingestion_time to the hook since some jobs might take considerably more time than the others By default we dont want to set a max ingestion time. Closes #2866 from Fokko/AIRFLOW-1907-pass-max- ingestion-time Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/683a27f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/683a27f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/683a27f2 Branch: refs/heads/master Commit: 683a27f2c16e036b42226cb9d96012d0616d0aa0 Parents: c70d8f5 Author: Fokko Driesprong <fokkodriespr...@godatadriven.com> Authored: Tue Dec 12 10:40:46 2017 +0100 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Tue Dec 12 10:40:46 2017 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/druid_operator.py | 8 ++++++-- airflow/hooks/druid_hook.py | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/683a27f2/airflow/contrib/operators/druid_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/druid_operator.py b/airflow/contrib/operators/druid_operator.py index 6978cc3..965dc50 100644 --- a/airflow/contrib/operators/druid_operator.py +++ b/airflow/contrib/operators/druid_operator.py @@ -34,10 +34,11 @@ class DruidOperator(BaseOperator): self, json_index_file, druid_ingest_conn_id='druid_ingest_default', + max_ingestion_time=None, *args, **kwargs): - super(DruidOperator, self).__init__(*args, **kwargs) self.conn_id = druid_ingest_conn_id + self.max_ingestion_time = max_ingestion_time with open(json_index_file) as data_file: index_spec = json.load(data_file) @@ -49,6 +50,9 @@ class DruidOperator(BaseOperator): ) def execute(self, context): - hook = DruidHook(druid_ingest_conn_id=self.conn_id) + hook = DruidHook( + druid_ingest_conn_id=self.conn_id, + max_ingestion_time=self.max_ingestion_time + ) self.log.info("Sumitting %s", self.index_spec_str) hook.submit_indexing_job(self.index_spec_str) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/683a27f2/airflow/hooks/druid_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index 655f666..9ce1f9a 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -36,7 +36,7 @@ class DruidHook(BaseHook): self, druid_ingest_conn_id='druid_ingest_default', timeout=1, - max_ingestion_time=18000): + max_ingestion_time=None): self.druid_ingest_conn_id = druid_ingest_conn_id self.timeout = timeout @@ -72,7 +72,7 @@ class DruidHook(BaseHook): sec = sec + 1 - if sec > self.max_ingestion_time: + if self.max_ingestion_time and sec > self.max_ingestion_time: # ensure that the job gets killed if the max ingestion time is exceeded requests.post("{0}/{1}/shutdown".format(url, druid_task_id)) raise AirflowException('Druid ingestion took more than %s seconds', self.max_ingestion_time)