Repository: incubator-airflow Updated Branches: refs/heads/master 1f0a717b6 -> 272952a9d
[AIRFLOW-2429] Fix dag, example_dags, executors flake8 error Closes #3398 from feng-tao/flake8_p3 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/272952a9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/272952a9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/272952a9 Branch: refs/heads/master Commit: 272952a9dce932cb2c648f82c9f9f2cafd572ff1 Parents: 1f0a717 Author: Tao feng <[email protected]> Authored: Tue May 22 15:31:29 2018 +0100 Committer: Kaxil Naik <[email protected]> Committed: Tue May 22 15:31:29 2018 +0100 ---------------------------------------------------------------------- airflow/dag/__init__.py | 4 ++-- airflow/dag/base_dag.py | 6 ++--- airflow/example_dags/__init__.py | 5 ++--- airflow/example_dags/docker_copy_data.py | 11 ++++++---- airflow/example_dags/entrypoint.sh | 5 +++-- airflow/example_dags/example_bash_operator.py | 6 ++--- airflow/example_dags/example_branch_operator.py | 4 ++-- .../example_branch_python_dop_operator_3.py | 12 +++++----- airflow/example_dags/example_docker_operator.py | 4 ++-- airflow/example_dags/example_http_operator.py | 4 ++-- .../example_dags/example_kubernetes_executor.py | 23 ++++++++++++-------- .../example_dags/example_kubernetes_operator.py | 2 ++ airflow/example_dags/example_latest_only.py | 5 ++--- .../example_latest_only_with_trigger.py | 4 ++-- .../example_passing_params_via_test_command.py | 12 +++++----- airflow/example_dags/example_python_operator.py | 2 ++ .../example_short_circuit_operator.py | 5 +++-- airflow/example_dags/example_skip_dag.py | 4 ++-- airflow/example_dags/example_subdag_operator.py | 5 +++-- .../example_trigger_controller_dag.py | 6 ++--- .../example_dags/example_trigger_target_dag.py | 13 +++++++---- airflow/example_dags/example_xcom.py | 4 ++-- airflow/example_dags/subdags/__init__.py | 5 ++--- airflow/example_dags/subdags/subdag.py | 4 ++-- airflow/example_dags/test_utils.py | 5 ++--- airflow/example_dags/tutorial.py | 4 ++-- airflow/exceptions.py | 4 ++-- airflow/executors/__init__.py | 17 ++++++++------- airflow/executors/base_executor.py | 3 +++ airflow/executors/celery_executor.py | 8 +++---- airflow/executors/dask_executor.py | 7 +++--- airflow/executors/local_executor.py | 4 ++-- airflow/executors/sequential_executor.py | 4 ++-- 33 files changed, 117 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/dag/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/dag/__init__.py b/airflow/dag/__init__.py index 4067cc7..114d189 100644 --- a/airflow/dag/__init__.py +++ b/airflow/dag/__init__.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/dag/base_dag.py ---------------------------------------------------------------------- diff --git a/airflow/dag/base_dag.py b/airflow/dag/base_dag.py index 43b2ec1..5719f57 100644 --- a/airflow/dag/base_dag.py +++ b/airflow/dag/base_dag.py @@ -7,16 +7,16 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# + from __future__ import absolute_import from __future__ import division from __future__ import print_function http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/__init__.py b/airflow/example_dags/__init__.py index f0f8b68..114d189 100644 --- a/airflow/example_dags/__init__.py +++ b/airflow/example_dags/__init__.py @@ -7,13 +7,12 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/docker_copy_data.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/docker_copy_data.py b/airflow/example_dags/docker_copy_data.py index 9f5ecb8..b037df9 100644 --- a/airflow/example_dags/docker_copy_data.py +++ b/airflow/example_dags/docker_copy_data.py @@ -16,12 +16,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + """ This sample "listen to directory". move the new file and print it, using docker-containers. -The following operators are being used: DockerOperator, BashOperator & -ShortCircuitOperator. -TODO: Review the workflow, change it accordingly to to your environment & enable the code. +The following operators are being used: DockerOperator, +BashOperator & ShortCircuitOperator. +TODO: Review the workflow, change it accordingly to + your environment & enable the code. """ # from __future__ import print_function @@ -45,7 +47,8 @@ TODO: Review the workflow, change it accordingly to to your environment & enable # } # # dag = DAG( -# 'docker_sample_copy_data', default_args=default_args, schedule_interval=timedelta(minutes=10)) +# 'docker_sample_copy_data', default_args= +# default_args, schedule_interval=timedelta(minutes=10)) # # locate_file_cmd = """ # sleep 10 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/entrypoint.sh ---------------------------------------------------------------------- diff --git a/airflow/example_dags/entrypoint.sh b/airflow/example_dags/entrypoint.sh index 15858fa..a5a2940 100644 --- a/airflow/example_dags/entrypoint.sh +++ b/airflow/example_dags/entrypoint.sh @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -6,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_bash_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index 439e4f4..b2d9d14 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -45,7 +45,7 @@ run_this.set_downstream(run_this_last) for i in range(3): i = str(i) task = BashOperator( - task_id='runme_'+i, + task_id='runme_' + i, bash_command='echo "{{ task_instance_key_str }}" && sleep 1', dag=dag) task.set_downstream(run_this) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_branch_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index 14824b8..45bf11f 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_branch_python_dop_operator_3.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py index 42ee127..7be55a5 100644 --- a/airflow/example_dags/example_branch_python_dop_operator_3.py +++ b/airflow/example_dags/example_branch_python_dop_operator_3.py @@ -7,22 +7,20 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# import airflow from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG -from datetime import datetime, timedelta args = { 'owner': 'airflow', @@ -33,12 +31,14 @@ args = { # BranchPython operator that depends on past # and where tasks may run or be skipped on # alternating runs -dag = DAG(dag_id='example_branch_dop_operator_v3',schedule_interval='*/1 * * * *', default_args=args) +dag = DAG(dag_id='example_branch_dop_operator_v3', + schedule_interval='*/1 * * * *', default_args=args) def should_run(ds, **kwargs): - print("------------- exec dttm = {} and minute = {}".format(kwargs['execution_date'], kwargs['execution_date'].minute)) + print('------------- exec dttm = {} and minute = {}'. + format(kwargs['execution_date'], kwargs['execution_date'].minute)) if kwargs['execution_date'].minute % 2 == 0: return "oper_1" else: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_docker_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_docker_operator.py b/airflow/example_dags/example_docker_operator.py index cad8c54..1ddf90f 100644 --- a/airflow/example_dags/example_docker_operator.py +++ b/airflow/example_dags/example_docker_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_http_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py index 7d9b13a..da7ea3f 100644 --- a/airflow/example_dags/example_http_operator.py +++ b/airflow/example_dags/example_http_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py index 2a02ef6..1d9bb73 100644 --- a/airflow/example_dags/example_kubernetes_executor.py +++ b/airflow/example_dags/example_kubernetes_executor.py @@ -1,16 +1,21 @@ # -*- coding: utf-8 -*- # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. from __future__ import print_function import airflow from airflow.operators.python_operator import PythonOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_kubernetes_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_kubernetes_operator.py b/airflow/example_dags/example_kubernetes_operator.py index a7013ec..8f5ab39 100644 --- a/airflow/example_dags/example_kubernetes_operator.py +++ b/airflow/example_dags/example_kubernetes_operator.py @@ -1,3 +1,5 @@ +# -*- coding: utf-8 -*- +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_latest_only.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py index d91bdc8..fdb2dca 100644 --- a/airflow/example_dags/example_latest_only.py +++ b/airflow/example_dags/example_latest_only.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -25,7 +25,6 @@ import airflow from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.latest_only_operator import LatestOnlyOperator -from airflow.utils.trigger_rule import TriggerRule dag = DAG( dag_id='latest_only', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_latest_only_with_trigger.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py index 90a31b2..b8f4811 100644 --- a/airflow/example_dags/example_latest_only_with_trigger.py +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_passing_params_via_test_command.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py index e32fe0a..7efca2f 100644 --- a/airflow/example_dags/example_passing_params_via_test_command.py +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -7,16 +7,15 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -# from datetime import timedelta import airflow @@ -24,6 +23,7 @@ from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator + dag = DAG("example_passing_params_via_test_command", default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(1)}, @@ -43,6 +43,7 @@ def my_py_command(ds, **kwargs): print(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"])) return 1 + my_templated_command = """ echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} " echo " 'miff was passed in via BashOperator with value {{ params.miff }} " @@ -52,12 +53,13 @@ run_this = PythonOperator( task_id='run_this', provide_context=True, python_callable=my_py_command, - params={"miff":"agg"}, + params={"miff": "agg"}, dag=dag) + also_run_this = BashOperator( task_id='also_run_this', bash_command=my_templated_command, - params={"miff":"agg"}, + params={"miff": "agg"}, dag=dag) also_run_this.set_upstream(run_this) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_python_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index 2825eda..0ecf099 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -16,6 +16,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + from __future__ import print_function from builtins import range import airflow @@ -45,6 +46,7 @@ def print_context(ds, **kwargs): print(ds) return 'Whatever you return gets printed in the logs' + run_this = PythonOperator( task_id='print_the_context', provide_context=True, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_short_circuit_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index c40ccb7..8e9565d 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -7,15 +7,16 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + import airflow from airflow.operators.python_operator import ShortCircuitOperator from airflow.operators.dummy_operator import DummyOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_skip_dag.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index 896c3e3..f11ca59 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_subdag_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py index 1d051c4..ffd254b 100644 --- a/airflow/example_dags/example_subdag_operator.py +++ b/airflow/example_dags/example_subdag_operator.py @@ -7,15 +7,16 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + import airflow from airflow.models import DAG http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_trigger_controller_dag.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index 3fa8896..f5c7218 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -44,7 +44,7 @@ pp = pprint.PrettyPrinter(indent=4) def conditionally_trigger(context, dag_run_obj): """This function decides whether or not to Trigger the remote DAG""" - c_p =context['params']['condition_param'] + c_p = context['params']['condition_param'] print("Controller DAG : conditionally_trigger = {}".format(c_p)) if context['params']['condition_param']: dag_run_obj.payload = {'message': context['params']['message']} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_trigger_target_dag.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index ae531bf..7a656f2 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -7,15 +7,16 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.models import DAG @@ -53,7 +54,9 @@ dag = DAG( def run_this_func(ds, **kwargs): - print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message'])) + print("Remotely received value of {} for key=message". + format(kwargs['dag_run'].conf['message'])) + run_this = PythonOperator( task_id='run_this', @@ -61,8 +64,10 @@ run_this = PythonOperator( python_callable=run_this_func, dag=dag) + # You can also access the DagRun object in templates bash_task = BashOperator( task_id="bash_task", - bash_command='echo "Here is the message: {{ dag_run.conf["message"] if dag_run else "" }}" ', + bash_command='echo "Here is the message: ' + '{{ dag_run.conf["message"] if dag_run else "" }}" ', dag=dag) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/example_xcom.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 9b3a5ed..66bec9a 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/subdags/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/subdags/__init__.py b/airflow/example_dags/subdags/__init__.py index f0f8b68..114d189 100644 --- a/airflow/example_dags/subdags/__init__.py +++ b/airflow/example_dags/subdags/__init__.py @@ -7,13 +7,12 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/subdags/subdag.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py index 02c5c15..6a67c7d 100644 --- a/airflow/example_dags/subdags/subdag.py +++ b/airflow/example_dags/subdags/subdag.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/test_utils.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py index fae2604..fb8792a 100644 --- a/airflow/example_dags/test_utils.py +++ b/airflow/example_dags/test_utils.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,7 +20,6 @@ import airflow from airflow.operators.bash_operator import BashOperator from airflow.models import DAG -from datetime import datetime dag = DAG( dag_id='test_utils', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/example_dags/tutorial.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py index 3892d11..ad81733 100644 --- a/airflow/example_dags/tutorial.py +++ b/airflow/example_dags/tutorial.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/exceptions.py ---------------------------------------------------------------------- diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 4d2db50..96e58b4 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/executors/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py index 7ae396c..b91a24e 100644 --- a/airflow/executors/__init__.py +++ b/airflow/executors/__init__.py @@ -7,25 +7,26 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + import sys from airflow.utils.log.logging_mixin import LoggingMixin from airflow import configuration from airflow.exceptions import AirflowException -from airflow.executors.base_executor import BaseExecutor from airflow.executors.local_executor import LocalExecutor from airflow.executors.sequential_executor import SequentialExecutor DEFAULT_EXECUTOR = None + def _integrate_plugins(): """Integrate plugins to the context.""" from airflow.plugins_manager import executors_modules @@ -33,6 +34,7 @@ def _integrate_plugins(): sys.modules[executors_module.__name__] = executors_module globals()[executors_module._name] = executors_module + def GetDefaultExecutor(): """Creates a new instance of the configured executor if none exists and returns it""" global DEFAULT_EXECUTOR @@ -59,10 +61,10 @@ class Executors: KubernetesExecutor = "KubernetesExecutor" - def _get_executor(executor_name): """ - Creates a new instance of the named executor. In case the executor name is not know in airflow, + Creates a new instance of the named executor. + In case the executor name is not know in airflow, look for it in the plugins """ if executor_name == Executors.LocalExecutor: @@ -87,11 +89,10 @@ def _get_executor(executor_name): executor_path = executor_name.split('.') if len(executor_path) != 2: raise AirflowException( - "Executor {0} not supported: please specify in format plugin_module.executor".format(executor_name)) + "Executor {0} not supported: " + "please specify in format plugin_module.executor".format(executor_name)) if executor_path[0] in globals(): return globals()[executor_path[0]].__dict__[executor_path[1]]() else: raise AirflowException("Executor {0} not supported.".format(executor_name)) - - http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/executors/base_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 0648f9b..04c9088 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -16,11 +16,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + from builtins import range from airflow import configuration from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import State + + PARALLELISM = configuration.conf.getint('core', 'PARALLELISM') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/executors/celery_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 2de7c46..6cfd2d3 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -81,8 +81,8 @@ class CeleryExecutor(BaseExecutor): def execute_async(self, key, command, queue=DEFAULT_CELERY_CONFIG['task_default_queue'], executor_config=None): - self.log.info( "[celery] queuing {key} through celery, " - "queue={queue}".format(**locals())) + self.log.info("[celery] queuing {key} through celery, " + "queue={queue}".format(**locals())) self.tasks[key] = execute_command.apply_async( args=[command], queue=queue) self.last_state[key] = celery_states.PENDING http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/executors/dask_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/dask_executor.py b/airflow/executors/dask_executor.py index 42716ee..a6ba677 100644 --- a/airflow/executors/dask_executor.py +++ b/airflow/executors/dask_executor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -59,7 +59,8 @@ class DaskExecutor(BaseExecutor): def execute_async(self, key, command, queue=None, executor_config=None): if queue is not None: warnings.warn( - 'DaskExecutor does not support queues. All tasks will be run in the same cluster' + 'DaskExecutor does not support queues. ' + 'All tasks will be run in the same cluster' ) def airflow_run(): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/executors/local_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/local_executor.py b/airflow/executors/local_executor.py index 9f75948..0c85262 100644 --- a/airflow/executors/local_executor.py +++ b/airflow/executors/local_executor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/272952a9/airflow/executors/sequential_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/sequential_executor.py b/airflow/executors/sequential_executor.py index 39153b8..9c0d8ec 100644 --- a/airflow/executors/sequential_executor.py +++ b/airflow/executors/sequential_executor.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
