[AIRFLOW-285] Use Airflow 2.0 style imports for all remaining hooks/operators
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dc84fdec Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dc84fdec Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dc84fdec Branch: refs/heads/master Commit: dc84fdecdfd9de12392c4e1c92005bd427d3ca37 Parents: dce633e Author: Chris Riccomini <chr...@wepay.com> Authored: Mon Jun 27 20:08:48 2016 -0700 Committer: Chris Riccomini <chr...@wepay.com> Committed: Tue Jun 28 13:34:47 2016 -0700 ---------------------------------------------------------------------- .../example_dags/example_qubole_operator.py | 5 +- .../contrib/example_dags/example_twitter_dag.py | 3 +- .../operators/bigquery_check_operator.py | 2 +- airflow/contrib/operators/qubole_operator.py | 2 +- airflow/contrib/operators/vertica_operator.py | 2 +- airflow/contrib/operators/vertica_to_hive.py | 2 +- airflow/example_dags/example_bash_operator.py | 3 +- airflow/example_dags/example_branch_operator.py | 3 +- .../example_branch_python_dop_operator_3.py | 3 +- airflow/example_dags/example_http_operator.py | 2 +- .../example_passing_params_via_test_command.py | 3 +- airflow/example_dags/example_python_operator.py | 2 +- .../example_short_circuit_operator.py | 3 +- airflow/example_dags/example_skip_dag.py | 2 +- airflow/example_dags/example_subdag_operator.py | 3 +- .../example_trigger_controller_dag.py | 2 +- .../example_dags/example_trigger_target_dag.py | 3 +- airflow/example_dags/example_xcom.py | 6 +- airflow/example_dags/subdags/subdag.py | 2 +- airflow/example_dags/test_utils.py | 2 +- airflow/example_dags/tutorial.py | 2 +- airflow/operators/check_operator.py | 2 +- airflow/operators/http_operator.py | 2 +- airflow/operators/presto_check_operator.py | 2 +- airflow/operators/sensors.py | 4 +- airflow/operators/sqlite_operator.py | 2 +- airflow/utils/helpers.py | 37 +++++----- airflow/utils/logging.py | 2 +- airflow/www/views.py | 3 +- docs/concepts.rst | 4 +- docs/tutorial.rst | 6 +- tests/core.py | 71 ++++++++++++-------- tests/dags/test_backfill_pooled_tasks.py | 2 +- tests/dags/test_issue_1225.py | 4 +- tests/dags/test_scheduler_dags.py | 2 +- tests/jobs.py | 2 +- tests/models.py | 4 +- tests/operators/subdag_operator.py | 4 +- 38 files changed, 121 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/example_dags/example_qubole_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py index 4f974e2..63cccd3 100644 --- a/airflow/contrib/example_dags/example_qubole_operator.py +++ b/airflow/contrib/example_dags/example_qubole_operator.py @@ -13,8 +13,9 @@ # limitations under the License. from airflow import DAG -from airflow.operators import DummyOperator, PythonOperator, BranchPythonOperator -from airflow.contrib.operators import QuboleOperator +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import PythonOperator, BranchPythonOperator +from airflow.contrib.operators.qubole_operator import QuboleOperator from datetime import datetime, timedelta import filecmp import random http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/example_dags/example_twitter_dag.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/example_dags/example_twitter_dag.py b/airflow/contrib/example_dags/example_twitter_dag.py index af1978e..d63b4e8 100644 --- a/airflow/contrib/example_dags/example_twitter_dag.py +++ b/airflow/contrib/example_dags/example_twitter_dag.py @@ -23,7 +23,8 @@ # -------------------------------------------------------------------------------- from airflow import DAG -from airflow.operators import BashOperator, PythonOperator +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator from airflow.operators.hive_operator import HiveOperator from datetime import datetime, date, timedelta http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/bigquery_check_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_check_operator.py b/airflow/contrib/operators/bigquery_check_operator.py index 87e0ad7..10f0b7c 100644 --- a/airflow/contrib/operators/bigquery_check_operator.py +++ b/airflow/contrib/operators/bigquery_check_operator.py @@ -13,7 +13,7 @@ # limitations under the License. from airflow.contrib.hooks.bigquery_hook import BigQueryHook -from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator +from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator from airflow.utils.decorators import apply_defaults http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/qubole_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py index 9923cec..cbf15c4 100755 --- a/airflow/contrib/operators/qubole_operator.py +++ b/airflow/contrib/operators/qubole_operator.py @@ -14,7 +14,7 @@ from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults -from airflow.contrib.hooks import QuboleHook +from airflow.contrib.hooks.qubole_hook import QuboleHook class QuboleOperator(BaseOperator): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/vertica_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/vertica_operator.py b/airflow/contrib/operators/vertica_operator.py index 471018f..9266563 100644 --- a/airflow/contrib/operators/vertica_operator.py +++ b/airflow/contrib/operators/vertica_operator.py @@ -14,7 +14,7 @@ import logging -from airflow.contrib.hooks import VerticaHook +from airflow.contrib.hooks.vertica_hook import VerticaHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/contrib/operators/vertica_to_hive.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/vertica_to_hive.py b/airflow/contrib/operators/vertica_to_hive.py index 4071111..57e4fa8 100644 --- a/airflow/contrib/operators/vertica_to_hive.py +++ b/airflow/contrib/operators/vertica_to_hive.py @@ -19,7 +19,7 @@ import logging from tempfile import NamedTemporaryFile from airflow.hooks.hive_hooks import HiveCliHook -from airflow.contrib.hooks import VerticaHook +from airflow.contrib.hooks.vertica_hook import VerticaHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_bash_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index c759f4d..0d18bcf 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. from builtins import range -from airflow.operators import BashOperator, DummyOperator +from airflow.operators.bash_operator import BashOperator +from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG from datetime import datetime, timedelta http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_branch_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index edd177a..cc559d0 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from airflow.operators import BranchPythonOperator, DummyOperator +from airflow.operators.python_operator import BranchPythonOperator +from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG from datetime import datetime, timedelta import random http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_branch_python_dop_operator_3.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py index ff959fc..19bb183 100644 --- a/airflow/example_dags/example_branch_python_dop_operator_3.py +++ b/airflow/example_dags/example_branch_python_dop_operator_3.py @@ -13,7 +13,8 @@ # limitations under the License. # -from airflow.operators import BranchPythonOperator, DummyOperator +from airflow.operators.python_operator import BranchPythonOperator +from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG from datetime import datetime, timedelta http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_http_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py index 41ea385..12b0448 100644 --- a/airflow/example_dags/example_http_operator.py +++ b/airflow/example_dags/example_http_operator.py @@ -15,7 +15,7 @@ ### Example HTTP operator and sensor """ from airflow import DAG -from airflow.operators import SimpleHttpOperator +from airflow.operators.http_operator import SimpleHttpOperator from airflow.operators.sensors import HttpSensor from datetime import datetime, timedelta import json http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_passing_params_via_test_command.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py index af473d9..cd5a251 100644 --- a/airflow/example_dags/example_passing_params_via_test_command.py +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -16,7 +16,8 @@ from datetime import datetime, timedelta from airflow import DAG -from airflow.operators import BashOperator, PythonOperator +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator dag = DAG("example_passing_params_via_test_command", default_args={"owner" : "airflow", http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_python_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index a2f8abd..6c0b93f 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -13,7 +13,7 @@ # limitations under the License. from __future__ import print_function from builtins import range -from airflow.operators import PythonOperator +from airflow.operators.python_operator import PythonOperator from airflow.models import DAG from datetime import datetime, timedelta http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_short_circuit_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index 907cf51..92efe99 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from airflow.operators import ShortCircuitOperator, DummyOperator +from airflow.operators.python_operator import ShortCircuitOperator +from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG import airflow.utils.helpers from datetime import datetime, timedelta http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_skip_dag.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index b55c3a8..a38b126 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from airflow.operators import DummyOperator +from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG from datetime import datetime, timedelta from airflow.exceptions import AirflowSkipException http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_subdag_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py index 57a62c6..b872f43 100644 --- a/airflow/example_dags/example_subdag_operator.py +++ b/airflow/example_dags/example_subdag_operator.py @@ -14,7 +14,8 @@ from datetime import datetime from airflow.models import DAG -from airflow.operators import DummyOperator, SubDagOperator +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.subdag_operator import SubDagOperator from airflow.example_dags.subdags.subdag import subdag http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_trigger_controller_dag.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index b754d64..eb8cee0 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -29,7 +29,7 @@ This example illustrates the following features : """ from airflow import DAG -from airflow.operators import TriggerDagRunOperator +from airflow.operators.dagrun_operator import TriggerDagRunOperator from datetime import datetime import pprint http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_trigger_target_dag.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index 41a3e36..a2a85f6 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from airflow.operators import BashOperator, PythonOperator +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator from airflow.models import DAG from datetime import datetime http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/example_xcom.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 71cd44e..8dd2666 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -56,13 +56,13 @@ def puller(**kwargs): v1, v2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning']) assert (v1, v2) == (value_1, value_2) -push1 = airflow.operators.PythonOperator( +push1 = airflow.operators.python_operator.PythonOperator( task_id='push', dag=dag, python_callable=push) -push2 = airflow.operators.PythonOperator( +push2 = airflow.operators.python_operator.PythonOperator( task_id='push_by_returning', dag=dag, python_callable=push_by_returning) -pull = airflow.operators.PythonOperator( +pull = airflow.operators.python_operator.PythonOperator( task_id='puller', dag=dag, python_callable=puller) pull.set_upstream([push1, push2]) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/subdags/subdag.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py index c0e1326..82e1dd1 100644 --- a/airflow/example_dags/subdags/subdag.py +++ b/airflow/example_dags/subdags/subdag.py @@ -13,7 +13,7 @@ # limitations under the License. from airflow.models import DAG -from airflow.operators import DummyOperator +from airflow.operators.dummy_operator import DummyOperator def subdag(parent_dag_name, child_dag_name, args): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/test_utils.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py index 38e50d0..70391c3 100644 --- a/airflow/example_dags/test_utils.py +++ b/airflow/example_dags/test_utils.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Used for unit tests""" -from airflow.operators import BashOperator +from airflow.operators.bash_operator import BashOperator from airflow.models import DAG from datetime import datetime http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/example_dags/tutorial.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py index 6bb2cd3..7c89666 100644 --- a/airflow/example_dags/tutorial.py +++ b/airflow/example_dags/tutorial.py @@ -18,7 +18,7 @@ Documentation that goes along with the Airflow tutorial located [here](http://pythonhosted.org/airflow/tutorial.html) """ from airflow import DAG -from airflow.operators import BashOperator +from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta seven_days_ago = datetime.combine(datetime.today() - timedelta(7), http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/check_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/check_operator.py b/airflow/operators/check_operator.py index e4c8262..83190eb 100644 --- a/airflow/operators/check_operator.py +++ b/airflow/operators/check_operator.py @@ -17,7 +17,7 @@ from builtins import str import logging from airflow.exceptions import AirflowException -from airflow.hooks import BaseHook +from airflow.hooks.base_hook import BaseHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/http_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/http_operator.py b/airflow/operators/http_operator.py index ad9bd4f..e5cf339 100644 --- a/airflow/operators/http_operator.py +++ b/airflow/operators/http_operator.py @@ -15,7 +15,7 @@ import logging from airflow.exceptions import AirflowException -from airflow.hooks import HttpHook +from airflow.hooks.http_hook import HttpHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/presto_check_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/presto_check_operator.py b/airflow/operators/presto_check_operator.py index c1ac9cf..6207460 100644 --- a/airflow/operators/presto_check_operator.py +++ b/airflow/operators/presto_check_operator.py @@ -13,7 +13,7 @@ # limitations under the License. from airflow.hooks.presto_hook import PrestoHook -from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator +from airflow.operators.check_operator import CheckOperator, ValueCheckOperator, IntervalCheckOperator from airflow.utils.decorators import apply_defaults http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/sensors.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index 4e4cb3b..90a4d14 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -25,7 +25,7 @@ import airflow from airflow import hooks, settings from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException from airflow.models import BaseOperator, TaskInstance, Connection as DB -from airflow.hooks import BaseHook +from airflow.hooks.base_hook import BaseHook from airflow.utils.state import State from airflow.utils.decorators import apply_defaults @@ -519,7 +519,7 @@ class HttpSensor(BaseSensorOperator): self.extra_options = extra_options or {} self.response_check = response_check - self.hook = hooks.HttpHook(method='GET', http_conn_id=http_conn_id) + self.hook = hooks.http_hook.HttpHook(method='GET', http_conn_id=http_conn_id) def poke(self, context): logging.info('Poking: ' + self.endpoint) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/operators/sqlite_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/sqlite_operator.py b/airflow/operators/sqlite_operator.py index 52b3b4b..0ff4d05 100644 --- a/airflow/operators/sqlite_operator.py +++ b/airflow/operators/sqlite_operator.py @@ -14,7 +14,7 @@ import logging -from airflow.hooks import SqliteHook +from airflow.hooks.sqlite_hook import SqliteHook from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/utils/helpers.py ---------------------------------------------------------------------- diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index c79ebee..7e3426e 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -25,6 +25,7 @@ import logging import os import re import sys +import warnings from airflow.exceptions import AirflowException @@ -163,8 +164,9 @@ def pprinttable(rows): class AirflowImporter(object): """ Importer that dynamically loads a class and module from its parent. This - allows Airflow to support `from airflow.operators import BashOperator` even - though BashOperator is actually in airflow.operators.bash_operator. + allows Airflow to support `from airflow.operators.bash_operator import + BashOperator` even though BashOperator is actually in + airflow.operators.bash_operator. The importer also takes over for the parent_module by wrapping it. This is required to support attribute-based usage: @@ -182,9 +184,9 @@ class AirflowImporter(object): classes. :type module_attributes: string """ - self.parent_module = parent_module - self.attribute_modules = self._build_attribute_modules(module_attributes) - self.loaded_modules = {} + self._parent_module = parent_module + self._attribute_modules = self._build_attribute_modules(module_attributes) + self._loaded_modules = {} # Wrap the module so we can take over __getattr__. sys.modules[parent_module.__name__] = self @@ -215,34 +217,33 @@ class AirflowImporter(object): """ Load the class attribute if it hasn't been loaded yet, and return it. """ - module = self.attribute_modules.get(attribute, False) + module = self._attribute_modules.get(attribute, False) if not module: # This shouldn't happen. The check happens in find_modules, too. raise ImportError(attribute) - elif module not in self.loaded_modules: + elif module not in self._loaded_modules: # Note that it's very important to only load a given modules once. # If they are loaded more than once, the memory reference to the # class objects changes, and Python thinks that an object of type # Foo that was declared before Foo's module was reloaded is no # longer the same type as Foo after it's reloaded. - path = os.path.realpath(self.parent_module.__file__) + path = os.path.realpath(self._parent_module.__file__) folder = os.path.dirname(path) f, filename, description = imp.find_module(module, [folder]) - self.loaded_modules[module] = imp.load_module(module, f, filename, description) + self._loaded_modules[module] = imp.load_module(module, f, filename, description) # This functionality is deprecated, and AirflowImporter should be # removed in 2.0. - from zope.deprecation import deprecated as _deprecated - _deprecated( - attribute, + warnings.warn( "Importing {i} directly from {m} has been " "deprecated. Please import from " "'{m}.[operator_module]' instead. Support for direct " "imports will be dropped entirely in Airflow 2.0.".format( - i=attribute, m=self.parent_module)) + i=attribute, m=self._parent_module), + DeprecationWarning) - loaded_module = self.loaded_modules[module] + loaded_module = self._loaded_modules[module] return getattr(loaded_module, attribute) @@ -259,12 +260,12 @@ class AirflowImporter(object): It also allows normal from imports to work: - from airflow.operators import BashOperator + from airflow.operators.bash_operator import BashOperator """ - if hasattr(self.parent_module, attribute): + if hasattr(self._parent_module, attribute): # Always default to the parent module if the attribute exists. - return getattr(self.parent_module, attribute) - elif attribute in self.attribute_modules: + return getattr(self._parent_module, attribute) + elif attribute in self._attribute_modules: # Try and import the attribute if it's got a module defined. loaded_attribute = self._load_attribute(attribute) setattr(self, attribute, loaded_attribute) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/utils/logging.py ---------------------------------------------------------------------- diff --git a/airflow/utils/logging.py b/airflow/utils/logging.py index 8f5fc51..79c6cbf 100644 --- a/airflow/utils/logging.py +++ b/airflow/utils/logging.py @@ -128,7 +128,7 @@ class GCSLog(object): self.hook = None try: - from airflow.contrib.hooks import GoogleCloudStorageHook + from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook self.hook = GoogleCloudStorageHook( google_cloud_storage_conn_id=remote_conn_id) except: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index 1fb3f91..0bd5b05 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -60,7 +60,8 @@ from airflow.exceptions import AirflowException from airflow.settings import Session from airflow.models import XCom -from airflow.operators import BaseOperator, SubDagOperator +from airflow.models import BaseOperator +from airflow.operators.subdag_operator import SubDagOperator from airflow.utils.logging import LoggingMixin from airflow.utils.json import json_ser http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/docs/concepts.rst ---------------------------------------------------------------------- diff --git a/docs/concepts.rst b/docs/concepts.rst index 6e15ff8..31e7d61 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -485,7 +485,7 @@ the main UI. For example: #dags/subdag.py from airflow.models import DAG - from airflow.operators import DummyOperator + from airflow.operators.dummy_operator import DummyOperator # Dag is returned by a factory method @@ -510,7 +510,7 @@ This SubDAG can then be referenced in your main DAG file: # main_dag.py from datetime import datetime, timedelta from airflow.models import DAG - from airflow.operators import SubDagOperator + from airflow.operators.subdag_operator import SubDagOperator from dags.subdag import sub_dag http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/docs/tutorial.rst ---------------------------------------------------------------------- diff --git a/docs/tutorial.rst b/docs/tutorial.rst index e9d382b..a93479c 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -18,7 +18,7 @@ complicated, a line by line explanation follows below. https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py """ from airflow import DAG - from airflow.operators import BashOperator + from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta @@ -100,7 +100,7 @@ Airflow DAG object. Let's start by importing the libraries we will need. from airflow import DAG # Operators; we need this to operate! - from airflow.operators import BashOperator + from airflow.operators.bash_operator import BashOperator Default Arguments ----------------- @@ -270,7 +270,7 @@ something like this: http://airflow.readthedocs.org/en/latest/tutorial.html """ from airflow import DAG - from airflow.operators import BashOperator + from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 9c688ea..002ad30 100644 --- a/tests/core.py +++ b/tests/core.py @@ -36,8 +36,17 @@ from airflow.executors import SequentialExecutor, LocalExecutor from airflow.models import Variable configuration.test_mode() -from airflow import jobs, models, DAG, operators, hooks, utils, macros, settings, exceptions -from airflow.hooks import BaseHook +from airflow import jobs, models, DAG, utils, macros, settings, exceptions +from airflow.models import BaseOperator +from airflow.operators.bash_operator import BashOperator +from airflow.operators.check_operator import CheckOperator, ValueCheckOperator +from airflow.operators.dagrun_operator import TriggerDagRunOperator +from airflow.operators.python_operator import PythonOperator +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.http_operator import SimpleHttpOperator +from airflow.operators import sensors +from airflow.hooks.base_hook import BaseHook +from airflow.hooks.sqlite_hook import SqliteHook from airflow.bin import cli from airflow.www import app as application from airflow.settings import Session @@ -85,7 +94,7 @@ def reset(dag_id=TEST_DAG_ID): reset() -class OperatorSubclass(operators.BaseOperator): +class OperatorSubclass(BaseOperator): """ An operator to test template substitution """ @@ -305,7 +314,7 @@ class CoreTest(unittest.TestCase): assert hash(self.dag) != hash(dag_subclass) def test_time_sensor(self): - t = operators.sensors.TimeSensor( + t = sensors.TimeSensor( task_id='time_sensor_check', target_time=time(0), dag=self.dag) @@ -319,14 +328,14 @@ class CoreTest(unittest.TestCase): captainHook.run("CREATE TABLE operator_test_table (a, b)") captainHook.run("insert into operator_test_table values (1,2)") - t = operators.CheckOperator( + t = CheckOperator( task_id='check', sql="select count(*) from operator_test_table", conn_id=conn_id, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - t = operators.ValueCheckOperator( + t = ValueCheckOperator( task_id='value_check', pass_value=95, tolerance=0.1, @@ -350,7 +359,7 @@ class CoreTest(unittest.TestCase): Tests that Operators reject illegal arguments """ with warnings.catch_warnings(record=True) as w: - t = operators.BashOperator( + t = BashOperator( task_id='test_illegal_args', bash_command='echo success', dag=self.dag, @@ -362,14 +371,14 @@ class CoreTest(unittest.TestCase): w[0].message.args[0]) def test_bash_operator(self): - t = operators.BashOperator( + t = BashOperator( task_id='time_sensor_check', bash_command="echo success", dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_bash_operator_multi_byte_output(self): - t = operators.BashOperator( + t = BashOperator( task_id='test_multi_byte_bash_operator', bash_command=u"echo \u2600", dag=self.dag, @@ -381,7 +390,7 @@ class CoreTest(unittest.TestCase): if True: return obj - t = operators.TriggerDagRunOperator( + t = TriggerDagRunOperator( task_id='test_trigger_dagrun', trigger_dag_id='example_bash_operator', python_callable=trigga, @@ -389,7 +398,7 @@ class CoreTest(unittest.TestCase): t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_dryrun(self): - t = operators.BashOperator( + t = BashOperator( task_id='time_sensor_check', bash_command="echo success", dag=self.dag) @@ -404,14 +413,14 @@ class CoreTest(unittest.TestCase): t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_timedelta_sensor(self): - t = operators.sensors.TimeDeltaSensor( + t = sensors.TimeDeltaSensor( task_id='timedelta_sensor_check', delta=timedelta(seconds=2), dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_external_task_sensor(self): - t = operators.sensors.ExternalTaskSensor( + t = sensors.ExternalTaskSensor( task_id='test_external_task_sensor_check', external_dag_id=TEST_DAG_ID, external_task_id='time_sensor_check', @@ -419,7 +428,7 @@ class CoreTest(unittest.TestCase): t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_external_task_sensor_delta(self): - t = operators.sensors.ExternalTaskSensor( + t = sensors.ExternalTaskSensor( task_id='test_external_task_sensor_check_delta', external_dag_id=TEST_DAG_ID, external_task_id='time_sensor_check', @@ -429,7 +438,7 @@ class CoreTest(unittest.TestCase): t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_timeout(self): - t = operators.PythonOperator( + t = PythonOperator( task_id='test_timeout', execution_timeout=timedelta(seconds=1), python_callable=lambda: sleep(5), @@ -444,7 +453,7 @@ class CoreTest(unittest.TestCase): if not templates_dict['ds'] == ds: raise Exception("failure") - t = operators.PythonOperator( + t = PythonOperator( task_id='test_py_op', provide_context=True, python_callable=test_py_op, @@ -460,7 +469,7 @@ class CoreTest(unittest.TestCase): task_id='test_complex_template', some_templated_field={ 'foo': '123', - 'bar': ['baz', ' {{ ds }}'] + 'bar': ['baz', '{{ ds }}'] }, on_success_callback=verify_templated_field, dag=self.dag) @@ -698,7 +707,7 @@ class CoreTest(unittest.TestCase): def test_bad_trigger_rule(self): with self.assertRaises(AirflowException): - operators.DummyOperator( + DummyOperator( task_id='test_bad_trigger', trigger_rule="non_existant", dag=self.dag) @@ -1195,7 +1204,7 @@ class HttpOpSensorTest(unittest.TestCase): @mock.patch('requests.Session', FakeSession) def test_get(self): - t = operators.SimpleHttpOperator( + t = SimpleHttpOperator( task_id='get_op', method='GET', endpoint='/search', @@ -1206,7 +1215,7 @@ class HttpOpSensorTest(unittest.TestCase): @mock.patch('requests.Session', FakeSession) def test_get_response_check(self): - t = operators.SimpleHttpOperator( + t = SimpleHttpOperator( task_id='get_op', method='GET', endpoint='/search', @@ -1218,7 +1227,7 @@ class HttpOpSensorTest(unittest.TestCase): @mock.patch('requests.Session', FakeSession) def test_sensor(self): - sensor = operators.sensors.HttpSensor( + sensor = sensors.HttpSensor( task_id='http_sensor_check', conn_id='http_default', endpoint='/search', @@ -1256,7 +1265,7 @@ class ConnectionTest(unittest.TestCase): del os.environ[ev] def test_using_env_var(self): - c = hooks.SqliteHook.get_connection(conn_id='test_uri') + c = SqliteHook.get_connection(conn_id='test_uri') assert c.host == 'ec2.compute.com' assert c.schema == 'the_database' assert c.login == 'username' @@ -1264,7 +1273,7 @@ class ConnectionTest(unittest.TestCase): assert c.port == 5432 def test_using_unix_socket_env_var(self): - c = hooks.SqliteHook.get_connection(conn_id='test_uri_no_creds') + c = SqliteHook.get_connection(conn_id='test_uri_no_creds') assert c.host == 'ec2.compute.com' assert c.schema == 'the_database' assert c.login is None @@ -1282,12 +1291,12 @@ class ConnectionTest(unittest.TestCase): assert c.port is None def test_env_var_priority(self): - c = hooks.SqliteHook.get_connection(conn_id='airflow_db') + c = SqliteHook.get_connection(conn_id='airflow_db') assert c.host != 'ec2.compute.com' os.environ['AIRFLOW_CONN_AIRFLOW_DB'] = \ 'postgres://username:passw...@ec2.compute.com:5432/the_database' - c = hooks.SqliteHook.get_connection(conn_id='airflow_db') + c = SqliteHook.get_connection(conn_id='airflow_db') assert c.host == 'ec2.compute.com' assert c.schema == 'the_database' assert c.login == 'username' @@ -1311,15 +1320,21 @@ class WebHDFSHookTest(unittest.TestCase): assert c.proxy_user == 'someone' -@unittest.skipUnless("S3Hook" in dir(hooks), - "Skipping test because S3Hook is not installed") +try: + from airflow.hooks.S3_hook import S3Hook +except ImportError: + S3Hook = None + + +@unittest.skipIf(S3Hook is None, + "Skipping test because S3Hook is not installed") class S3HookTest(unittest.TestCase): def setUp(self): configuration.test_mode() self.s3_test_url = "s3://test/this/is/not/a-real-key.txt" def test_parse_s3_url(self): - parsed = hooks.S3Hook.parse_s3_url(self.s3_test_url) + parsed = S3Hook.parse_s3_url(self.s3_test_url) self.assertEqual(parsed, ("test", "this/is/not/a-real-key.txt"), "Incorrect parsing of the s3 url") http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_backfill_pooled_tasks.py ---------------------------------------------------------------------- diff --git a/tests/dags/test_backfill_pooled_tasks.py b/tests/dags/test_backfill_pooled_tasks.py index 306db7d..4b2ba8f 100644 --- a/tests/dags/test_backfill_pooled_tasks.py +++ b/tests/dags/test_backfill_pooled_tasks.py @@ -21,7 +21,7 @@ Addresses issue #1225. from datetime import datetime from airflow.models import DAG -from airflow.operators import DummyOperator +from airflow.operators.dummy_operator import DummyOperator dag = DAG(dag_id='test_backfill_pooled_task_dag') task = DummyOperator( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_issue_1225.py ---------------------------------------------------------------------- diff --git a/tests/dags/test_issue_1225.py b/tests/dags/test_issue_1225.py index ecfa646..8f43b08 100644 --- a/tests/dags/test_issue_1225.py +++ b/tests/dags/test_issue_1225.py @@ -21,7 +21,9 @@ Addresses issue #1225. from datetime import datetime from airflow.models import DAG -from airflow.operators import DummyOperator, PythonOperator, SubDagOperator +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import PythonOperator +from airflow.operators.subdag_operator import SubDagOperator from airflow.utils.trigger_rule import TriggerRule import time http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/dags/test_scheduler_dags.py ---------------------------------------------------------------------- diff --git a/tests/dags/test_scheduler_dags.py b/tests/dags/test_scheduler_dags.py index ac291e0..224e7c5 100644 --- a/tests/dags/test_scheduler_dags.py +++ b/tests/dags/test_scheduler_dags.py @@ -15,7 +15,7 @@ from datetime import datetime from airflow.models import DAG -from airflow.operators import DummyOperator +from airflow.operators.dummy_operator import DummyOperator DEFAULT_DATE = datetime(2100, 1, 1) # DAG tests backfill with pooled tasks http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 0619f3d..2f53fbc 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -26,7 +26,7 @@ from airflow.bin import cli from airflow.executors import DEFAULT_EXECUTOR from airflow.jobs import BackfillJob, SchedulerJob from airflow.models import DAG, DagModel, DagBag, DagRun, Pool, TaskInstance as TI -from airflow.operators import DummyOperator +from airflow.operators.dummy_operator import DummyOperator from airflow.utils.db import provide_session from airflow.utils.state import State from airflow.utils.timeout import timeout http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 2aae476..e4f5aa8 100644 --- a/tests/models.py +++ b/tests/models.py @@ -27,7 +27,9 @@ from airflow.exceptions import AirflowSkipException from airflow.models import DAG, TaskInstance as TI from airflow.models import State as ST from airflow.models import DagModel -from airflow.operators import DummyOperator, BashOperator, PythonOperator +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.bash_operator import BashOperator +from airflow.operators.python_operator import PythonOperator from airflow.utils.state import State from mock import patch from nose_parameterized import parameterized http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dc84fdec/tests/operators/subdag_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py index 0006f60..0a7be23 100644 --- a/tests/operators/subdag_operator.py +++ b/tests/operators/subdag_operator.py @@ -18,7 +18,9 @@ import unittest import airflow from airflow.models import DAG, DagBag -from airflow.operators import BashOperator, DummyOperator, SubDagOperator +from airflow.operators.bash_operator import BashOperator +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.subdag_operator import SubDagOperator from airflow.jobs import BackfillJob from airflow.exceptions import AirflowException