Repository: incubator-airflow Updated Branches: refs/heads/master 15aee05dd -> f2dae7d15
[AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor Closes #2180 from pdambrauskas/fix/http_hook_import Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f2dae7d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f2dae7d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f2dae7d1 Branch: refs/heads/master Commit: f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d Parents: 15aee05 Author: pdambrauskas <[email protected]> Authored: Tue Apr 4 08:39:54 2017 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Tue Apr 4 08:39:54 2017 +0200 ---------------------------------------------------------------------- airflow/operators/sensors.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f2dae7d1/airflow/operators/sensors.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index 44a97e0..ae50bc5 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -25,12 +25,12 @@ from time import sleep import re import sys -import airflow -from airflow import hooks, settings +from airflow import settings from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException from airflow.models import BaseOperator, TaskInstance from airflow.hooks.base_hook import BaseHook from airflow.hooks.hdfs_hook import HDFSHook +from airflow.hooks.http_hook import HttpHook from airflow.utils.state import State from airflow.utils.decorators import apply_defaults @@ -298,9 +298,9 @@ class NamedHivePartitionSensor(BaseSensorOperator): raise ValueError('Could not parse ' + partition) def poke(self, context): - if not hasattr(self, 'hook'): - self.hook = hooks.HiveMetastoreHook( + from airflow.hooks.hive_hooks import HiveMetastoreHook + self.hook = HiveMetastoreHook( metastore_conn_id=self.metastore_conn_id) def poke_partition(partition): @@ -369,7 +369,8 @@ class HivePartitionSensor(BaseSensorOperator): 'Poking for table {self.schema}.{self.table}, ' 'partition {self.partition}'.format(**locals())) if not hasattr(self, 'hook'): - self.hook = hooks.HiveMetastoreHook( + from airflow.hooks.hive_hooks import HiveMetastoreHook + self.hook = HiveMetastoreHook( metastore_conn_id=self.metastore_conn_id) return self.hook.check_for_partition( self.schema, self.table, self.partition) @@ -470,7 +471,8 @@ class WebHdfsSensor(BaseSensorOperator): self.webhdfs_conn_id = webhdfs_conn_id def poke(self, context): - c = airflow.hooks.webhdfs_hook.WebHDFSHook(self.webhdfs_conn_id) + from airflow.hooks.webhdfs_hook import WebHDFSHook + c = WebHDFSHook(self.webhdfs_conn_id) logging.info( 'Poking for file {self.filepath} '.format(**locals())) return c.check_for_path(hdfs_path=self.filepath) @@ -520,8 +522,8 @@ class S3KeySensor(BaseSensorOperator): self.s3_conn_id = s3_conn_id def poke(self, context): - import airflow.hooks.S3_hook - hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id) + from airflow.hooks.S3_hook import S3Hook + hook = S3Hook(s3_conn_id=self.s3_conn_id) full_url = "s3://" + self.bucket_name + "/" + self.bucket_key logging.info('Poking for key : {full_url}'.format(**locals())) if self.wildcard_match: @@ -567,8 +569,8 @@ class S3PrefixSensor(BaseSensorOperator): def poke(self, context): logging.info('Poking for prefix : {self.prefix}\n' 'in bucket s3://{self.bucket_name}'.format(**locals())) - import airflow.hooks.S3_hook - hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id) + from airflow.hooks.S3_hook import S3Hook + hook = S3Hook(s3_conn_id=self.s3_conn_id) return hook.check_for_prefix( prefix=self.prefix, delimiter=self.delimiter, @@ -660,7 +662,7 @@ class HttpSensor(BaseSensorOperator): self.extra_options = extra_options or {} self.response_check = response_check - self.hook = hooks.http_hook.HttpHook(method='GET', http_conn_id=http_conn_id) + self.hook = HttpHook(method='GET', http_conn_id=http_conn_id) def poke(self, context): logging.info('Poking: ' + self.endpoint)
