This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a458fcc Updating miscellaneous provider DAGs to use TaskFlow API
where applicable (#18278)
a458fcc is described below
commit a458fcc573845ff65244a2dafd204ed70129f3e8
Author: Josh Fell <[email protected]>
AuthorDate: Mon Sep 27 12:19:23 2021 -0400
Updating miscellaneous provider DAGs to use TaskFlow API where applicable
(#18278)
---
.../amazon/aws/example_dags/example_s3_bucket.py | 11 +-
.../aws/example_dags/example_s3_to_redshift.py | 31 +++--
.../hive/example_dags/example_twitter_README.md | 2 +-
.../hive/example_dags/example_twitter_dag.py | 134 ++++++++-------------
.../apache/kylin/example_dags/example_kylin_dag.py | 64 ++++------
.../google/cloud/example_dags/example_s3_to_gcs.py | 7 +-
.../example_dags/example_jenkins_job_trigger.py | 22 ++--
.../azure/example_dags/example_fileshare.py | 9 +-
.../papermill/example_dags/example_papermill.py | 7 +-
.../qubole/example_dags/example_qubole.py | 49 +++-----
.../providers/sqlite/example_dags/create_table.sql | 24 ++++
.../sqlite/example_dags/example_sqlite.py | 27 ++---
12 files changed, 160 insertions(+), 227 deletions(-)
diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
index ceeb4b2..ca226bc 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
@@ -16,8 +16,8 @@
# under the License.
import os
+from airflow.decorators import task
from airflow.models.dag import DAG
-from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3_bucket import
S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.utils.dates import days_ago
@@ -25,6 +25,7 @@ from airflow.utils.dates import days_ago
BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345')
+@task(task_id="s3_bucket_dag_add_keys_to_bucket")
def upload_keys():
"""This is a python callback to add keys into the s3 bucket"""
# add keys to bucket
@@ -41,6 +42,7 @@ with DAG(
dag_id='s3_bucket_dag',
schedule_interval=None,
start_date=days_ago(2),
+ default_args={"bucket_name": BUCKET_NAME},
max_active_runs=1,
tags=['example'],
) as dag:
@@ -48,17 +50,14 @@ with DAG(
# [START howto_operator_s3_bucket]
create_bucket = S3CreateBucketOperator(
task_id='s3_bucket_dag_create',
- bucket_name=BUCKET_NAME,
region_name='us-east-1',
)
- add_keys_to_bucket = PythonOperator(
- task_id="s3_bucket_dag_add_keys_to_bucket", python_callable=upload_keys
- )
+ # Using a task-decorated function to add keys
+ add_keys_to_bucket = upload_keys()
delete_bucket = S3DeleteBucketOperator(
task_id='s3_bucket_dag_delete',
- bucket_name=BUCKET_NAME,
force_delete=True,
)
# [END howto_operator_s3_bucket]
diff --git
a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
index 1a7c911..9cec527 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
@@ -22,7 +22,8 @@ This is an example dag for using `S3ToRedshiftOperator` to
copy a S3 key into a
from os import getenv
from airflow import DAG
-from airflow.operators.python import PythonOperator
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.transfers.s3_to_redshift import
S3ToRedshiftOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
@@ -35,12 +36,14 @@ REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "test_table")
# [END howto_operator_s3_to_redshift_env_variables]
-def _add_sample_data_to_s3():
+@task(task_id='setup__add_sample_data_to_s3')
+def add_sample_data_to_s3():
s3_hook = S3Hook()
s3_hook.load_string("0,Airflow", f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET,
replace=True)
-def _remove_sample_data_from_s3():
+@task(task_id='teardown__remove_sample_data_from_s3')
+def remove_sample_data_from_s3():
s3_hook = S3Hook()
if s3_hook.check_for_key(f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET):
s3_hook.delete_objects(S3_BUCKET, f'{S3_KEY}/{REDSHIFT_TABLE}')
@@ -49,9 +52,8 @@ def _remove_sample_data_from_s3():
with DAG(
dag_id="example_s3_to_redshift", start_date=days_ago(1),
schedule_interval=None, tags=['example']
) as dag:
- setup__task_add_sample_data_to_s3 = PythonOperator(
- python_callable=_add_sample_data_to_s3,
task_id='setup__add_sample_data_to_s3'
- )
+ add_sample_data_to_s3 = add_sample_data_to_s3()
+
setup__task_create_table = PostgresOperator(
sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name
varchar)',
postgres_conn_id='redshift_default',
@@ -72,14 +74,11 @@ with DAG(
postgres_conn_id='redshift_default',
task_id='teardown__drop_table',
)
- teardown__task_remove_sample_data_from_s3 = PythonOperator(
- python_callable=_remove_sample_data_from_s3,
task_id='teardown__remove_sample_data_from_s3'
- )
- (
- [setup__task_add_sample_data_to_s3, setup__task_create_table]
- >> task_transfer_s3_to_redshift
- >> [
- teardown__task_drop_table,
- teardown__task_remove_sample_data_from_s3,
- ]
+
+ remove_sample_data_from_s3 = remove_sample_data_from_s3()
+
+ chain(
+ [add_sample_data_to_s3, setup__task_create_table],
+ task_transfer_s3_to_redshift,
+ [teardown__task_drop_table, remove_sample_data_from_s3],
)
diff --git
a/airflow/providers/apache/hive/example_dags/example_twitter_README.md
b/airflow/providers/apache/hive/example_dags/example_twitter_README.md
index c22ca2c..8f88983 100644
--- a/airflow/providers/apache/hive/example_dags/example_twitter_README.md
+++ b/airflow/providers/apache/hive/example_dags/example_twitter_README.md
@@ -35,7 +35,7 @@
***Screenshot:***
<img src="http://i.imgur.com/rRpSO12.png" width="99%"/>
-***Example Structure:*** In this example dag, we are collecting tweets for
four users account or twitter handle. Each twitter handle has two channels,
incoming tweets and outgoing tweets. Hence, in this example, by running the
fetch_tweet task, we should have eight output files. For better management,
each of the eight output files should be saved with the yesterday's date (we
are collecting tweets from yesterday), i.e. toTwitter_A_2016-03-21.csv. We are
using three kind of operators: Py [...]
+***Example Structure:*** In this example dag, we are collecting tweets for
four users account or twitter handle. Each twitter handle has two channels,
incoming tweets and outgoing tweets. Hence, in this example, by running the
fetch_tweet task, we should have eight output files. For better management,
each of the eight output files should be saved with the yesterday's date (we
are collecting tweets from yesterday), i.e. toTwitter_A_2016-03-21.csv. We are
using two kinds of operators (Bas [...]
The python functions here are just placeholders. In case you are interested to
actually make this DAG fully functional, first start with filling out the
scripts as separate files and importing them into the DAG with absolute or
relative import. My approach was to store the retrieved data in memory using
Pandas dataframe first, and then use the built in method to save the CSV file
on hard-disk.
The eight different CSV files are then put into eight different folders within
HDFS. Each of the newly inserted files are then loaded into eight different
external hive tables. Hive tables can be external or internal. In this case, we
are inserting the data right into the table, and so we are making our tables
internal. Each file is inserted into the respected Hive table named after the
twitter channel, i.e. toTwitter_A or fromTwitter_A. It is also important to
note that when we created [...]
diff --git a/airflow/providers/apache/hive/example_dags/example_twitter_dag.py
b/airflow/providers/apache/hive/example_dags/example_twitter_dag.py
index de67457..d501706 100644
--- a/airflow/providers/apache/hive/example_dags/example_twitter_dag.py
+++ b/airflow/providers/apache/hive/example_dags/example_twitter_dag.py
@@ -31,38 +31,42 @@ This is an example dag for managing twitter data.
from datetime import date, timedelta
from airflow import DAG
+from airflow.decorators import task
from airflow.operators.bash import BashOperator
-from airflow.operators.python import PythonOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.utils.dates import days_ago
-#
--------------------------------------------------------------------------------
-# Create a few placeholder scripts. In practice these would be different python
-# script files, which are imported in this section with absolute or relative
imports
-#
--------------------------------------------------------------------------------
-
-def fetchtweets():
+@task
+def fetch_tweets():
"""
- This is a placeholder for fetchtweets.
+ This task should call Twitter API and retrieve tweets from yesterday from
and to for the four twitter
+ users (Twitter_A,..,Twitter_D) There should be eight csv output files
generated by this task and naming
+ convention is direction(from or to)_twitterHandle_date.csv
"""
-def cleantweets():
+@task
+def clean_tweets():
"""
- This is a placeholder for cleantweets.
+ This is a placeholder to clean the eight files. In this step you can get
rid of or cherry pick columns
+ and different parts of the text.
"""
-def analyzetweets():
+@task
+def analyze_tweets():
"""
- This is a placeholder for analyzetweets.
+ This is a placeholder to analyze the twitter data. Could simply be a
sentiment analysis through algorithms
+ like bag of words or something more complicated. You can also take a look
at Web Services to do such
+ tasks.
"""
-def transfertodb():
+@task
+def transfer_to_db():
"""
- This is a placeholder for transfertodb.
+ This is a placeholder to extract summary from Hive data and store it to
MySQL.
"""
@@ -70,53 +74,18 @@ with DAG(
dag_id='example_twitter_dag',
default_args={
'owner': 'Ekhtiar',
- 'depends_on_past': False,
- 'email': ['[email protected]'],
- 'email_on_failure': False,
- 'email_on_retry': False,
'retries': 1,
- 'retry_delay': timedelta(minutes=5),
},
schedule_interval="@daily",
start_date=days_ago(5),
tags=['example'],
) as dag:
+ fetch_tweets = fetch_tweets()
+ clean_tweets = clean_tweets()
+ analyze_tweets = analyze_tweets()
+ hive_to_mysql = transfer_to_db()
- #
--------------------------------------------------------------------------------
- # This task should call Twitter API and retrieve tweets from yesterday
from and to
- # for the four twitter users (Twitter_A,..,Twitter_D) There should be
eight csv
- # output files generated by this task and naming convention
- # is direction(from or to)_twitterHandle_date.csv
- #
--------------------------------------------------------------------------------
-
- fetch_tweets = PythonOperator(task_id='fetch_tweets',
python_callable=fetchtweets)
-
- #
--------------------------------------------------------------------------------
- # Clean the eight files. In this step you can get rid of or cherry pick
columns
- # and different parts of the text
- #
--------------------------------------------------------------------------------
-
- clean_tweets = PythonOperator(task_id='clean_tweets',
python_callable=cleantweets)
-
- clean_tweets << fetch_tweets
-
- #
--------------------------------------------------------------------------------
- # In this section you can use a script to analyze the twitter data. Could
simply
- # be a sentiment analysis through algorithms like bag of words or
something more
- # complicated. You can also take a look at Web Services to do such tasks
- #
--------------------------------------------------------------------------------
-
- analyze_tweets = PythonOperator(task_id='analyze_tweets',
python_callable=analyzetweets)
-
- analyze_tweets << clean_tweets
-
- #
--------------------------------------------------------------------------------
- # Although this is the last task, we need to declare it before the next
tasks as we
- # will use set_downstream This task will extract summary from Hive data
and store
- # it to MySQL
- #
--------------------------------------------------------------------------------
-
- hive_to_mysql = PythonOperator(task_id='hive_to_mysql',
python_callable=transfertodb)
+ fetch_tweets >> clean_tweets >> analyze_tweets
#
--------------------------------------------------------------------------------
# The following tasks are generated using for loop. The first task puts
the eight
@@ -137,49 +106,42 @@ with DAG(
for channel in to_channels:
- file_name = "to_" + channel + "_" + yesterday.strftime("%Y-%m-%d") +
".csv"
+ file_name = f"to_{channel}_{dt}.csv"
load_to_hdfs = BashOperator(
- task_id="put_" + channel + "_to_hdfs",
- bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f "
- + local_dir
- + file_name
- + hdfs_dir
- + channel
- + "/",
+ task_id=f"put_{channel}_to_hdfs",
+ bash_command=(
+ f"HADOOP_USER_NAME=hdfs hadoop fs -put -f
{local_dir}{file_name}{hdfs_dir}{channel}/"
+ ),
)
- load_to_hdfs << analyze_tweets
-
load_to_hive = HiveOperator(
- task_id="load_" + channel + "_to_hive",
- hql="LOAD DATA INPATH '" + hdfs_dir + channel + "/" + file_name +
"' "
- "INTO TABLE " + channel + " "
- "PARTITION(dt='" + dt + "')",
+ task_id=f"load_{channel}_to_hive",
+ hql=(
+ f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}'"
+ f"INTO TABLE {channel}"
+ f"PARTITION(dt='{dt}')"
+ ),
)
- load_to_hive << load_to_hdfs
- load_to_hive >> hive_to_mysql
+
+ analyze_tweets >> load_to_hdfs >> load_to_hive >> hive_to_mysql
for channel in from_channels:
- file_name = "from_" + channel + "_" + yesterday.strftime("%Y-%m-%d") +
".csv"
+ file_name = f"from_{channel}_{dt}.csv"
load_to_hdfs = BashOperator(
- task_id="put_" + channel + "_to_hdfs",
- bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f "
- + local_dir
- + file_name
- + hdfs_dir
- + channel
- + "/",
+ task_id=f"put_{channel}_to_hdfs",
+ bash_command=(
+ f"HADOOP_USER_NAME=hdfs hadoop fs -put -f
{local_dir}{file_name}{hdfs_dir}{channel}/"
+ ),
)
- load_to_hdfs << analyze_tweets
-
load_to_hive = HiveOperator(
- task_id="load_" + channel + "_to_hive",
- hql="LOAD DATA INPATH '" + hdfs_dir + channel + "/" + file_name +
"' "
- "INTO TABLE " + channel + " "
- "PARTITION(dt='" + dt + "')",
+ task_id=f"load_{channel}_to_hive",
+ hql=(
+ f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}' "
+ f"INTO TABLE {channel} "
+ f"PARTITION(dt='{dt}')"
+ ),
)
- load_to_hive << load_to_hdfs
- load_to_hive >> hive_to_mysql
+ analyze_tweets >> load_to_hdfs >> load_to_hive >> hive_to_mysql
diff --git a/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
b/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
index 0e8becd..22ae853 100644
--- a/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
+++ b/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
@@ -21,7 +21,6 @@ This is an example DAG which uses the KylinCubeOperator.
The tasks below include kylin build, refresh, merge operation.
"""
from airflow import DAG
-from airflow.operators.python import PythonOperator
from airflow.providers.apache.kylin.operators.kylin_cube import
KylinCubeOperator
from airflow.utils.dates import days_ago
@@ -29,42 +28,37 @@ dag = DAG(
dag_id='example_kylin_operator',
schedule_interval=None,
start_date=days_ago(1),
+ default_args={'kylin_conn_id': 'kylin_default', 'project': 'learn_kylin',
'cube': 'kylin_sales_cube'},
tags=['example'],
)
-def gen_build_time(**kwargs):
[email protected]
+def gen_build_time():
"""
- Gen build time and push to xcom
- :param kwargs:
- :return:
+ Gen build time and push to XCom (with key of "return_value")
+ :return: A dict with build time values.
"""
- ti = kwargs['ti']
- ti.xcom_push(key='date_start', value='1325347200000')
- ti.xcom_push(key='date_end', value='1325433600000')
+ return {'date_start': '1325347200000', 'date_end': '1325433600000'}
-gen_build_time_task = PythonOperator(python_callable=gen_build_time,
task_id='gen_build_time', dag=dag)
+gen_build_time = gen_build_time()
+gen_build_time_output_date_start = gen_build_time['date_start']
+gen_build_time_output_date_end = gen_build_time['date_end']
build_task1 = KylinCubeOperator(
task_id="kylin_build_1",
- kylin_conn_id='kylin_default',
- project='learn_kylin',
- cube='kylin_sales_cube',
command='build',
- start_time=gen_build_time_task.output['date_start'],
- end_time=gen_build_time_task.output['date_end'],
+ start_time=gen_build_time_output_date_start,
+ end_time=gen_build_time_output_date_end,
is_track_job=True,
dag=dag,
)
build_task2 = KylinCubeOperator(
task_id="kylin_build_2",
- kylin_conn_id='kylin_default',
- project='learn_kylin',
- cube='kylin_sales_cube',
command='build',
- start_time='1325433600000',
+ start_time=gen_build_time_output_date_end,
end_time='1325520000000',
is_track_job=True,
dag=dag,
@@ -72,23 +66,17 @@ build_task2 = KylinCubeOperator(
refresh_task1 = KylinCubeOperator(
task_id="kylin_refresh_1",
- kylin_conn_id='kylin_default',
- project='learn_kylin',
- cube='kylin_sales_cube',
command='refresh',
- start_time='1325347200000',
- end_time='1325433600000',
+ start_time=gen_build_time_output_date_start,
+ end_time=gen_build_time_output_date_end,
is_track_job=True,
dag=dag,
)
merge_task = KylinCubeOperator(
task_id="kylin_merge",
- kylin_conn_id='kylin_default',
- project='learn_kylin',
- cube='kylin_sales_cube',
command='merge',
- start_time='1325347200000',
+ start_time=gen_build_time_output_date_start,
end_time='1325520000000',
is_track_job=True,
dag=dag,
@@ -96,35 +84,29 @@ merge_task = KylinCubeOperator(
disable_task = KylinCubeOperator(
task_id="kylin_disable",
- kylin_conn_id='kylin_default',
- project='learn_kylin',
- cube='kylin_sales_cube',
command='disable',
dag=dag,
)
purge_task = KylinCubeOperator(
task_id="kylin_purge",
- kylin_conn_id='kylin_default',
- project='learn_kylin',
- cube='kylin_sales_cube',
command='purge',
dag=dag,
)
build_task3 = KylinCubeOperator(
task_id="kylin_build_3",
- kylin_conn_id='kylin_default',
- project='learn_kylin',
- cube='kylin_sales_cube',
command='build',
- start_time='1325433600000',
- end_time='1325520000000',
+ start_time=gen_build_time_output_date_end,
+ end_time='1328730000000',
dag=dag,
)
-build_task1 >> build_task2 >> refresh_task1 >> merge_task
-merge_task >> disable_task >> purge_task >> build_task3
+build_task1 >> build_task2 >> refresh_task1 >> merge_task >> disable_task >>
purge_task >> build_task3
# Task dependency created via `XComArgs`:
-# gen_build_time_task >> build_task1
+# gen_build_time >> build_task1
+# gen_build_time >> build_task2
+# gen_build_time >> refresh_task1
+# gen_build_time >> merge_task
+# gen_build_time >> build_task3
diff --git a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
b/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
index 118abfb..334aac5 100644
--- a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
@@ -18,7 +18,7 @@
import os
from airflow import models
-from airflow.operators.python import PythonOperator
+from airflow.decorators import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3_bucket import
S3CreateBucketOperator, S3DeleteBucketOperator
from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
@@ -32,6 +32,7 @@ UPLOAD_FILE = '/tmp/example-file.txt'
PREFIX = 'TESTS'
+@task(task_id='upload_file_to_s3')
def upload_file():
"""A callable to upload file to AWS bucket"""
s3_hook = S3Hook()
@@ -48,8 +49,6 @@ with models.DAG(
task_id="create_s3_bucket", bucket_name=S3BUCKET_NAME,
region_name='us-east-1'
)
- upload_to_s3 = PythonOperator(task_id='upload_file_to_s3',
python_callable=upload_file)
-
create_gcs_bucket = GCSCreateBucketOperator(
task_id="create_bucket",
bucket_name=GCS_BUCKET,
@@ -69,7 +68,7 @@ with models.DAG(
(
create_s3_bucket
- >> upload_to_s3
+ >> upload_file()
>> create_gcs_bucket
>> transfer_to_gcs
>> delete_s3_bucket
diff --git
a/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py
b/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py
index 66f4803..ce2c8ac 100644
--- a/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py
+++ b/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py
@@ -15,22 +15,21 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-from datetime import datetime, timedelta
+from datetime import datetime
from requests import Request
from airflow import DAG
-from airflow.operators.python import PythonOperator
+from airflow.decorators import task
from airflow.providers.jenkins.hooks.jenkins import JenkinsHook
from airflow.providers.jenkins.operators.jenkins_job_trigger import
JenkinsJobTriggerOperator
+JENKINS_CONNECTION_ID = "your_jenkins_connection"
+
with DAG(
"test_jenkins",
default_args={
- "owner": "airflow",
"retries": 1,
- "retry_delay": timedelta(minutes=5),
- "depends_on_past": False,
"concurrency": 8,
"max_active_runs": 8,
},
@@ -42,16 +41,17 @@ with DAG(
job_name="generate-merlin-config",
parameters={"first_parameter": "a_value", "second_parameter": "18"},
# parameters="resources/parameter.json", You can also pass a path to a
json file containing your param
- jenkins_connection_id="your_jenkins_connection", # T he connection
must be configured first
+ jenkins_connection_id=JENKINS_CONNECTION_ID, # The connection must be
configured first
)
+ @task
def grab_artifact_from_jenkins(url):
"""
Grab an artifact from the previous job
The python-jenkins library doesn't expose a method for that
But it's totally possible to build manually the request for that
"""
- hook = JenkinsHook("your_jenkins_connection")
+ hook = JenkinsHook(JENKINS_CONNECTION_ID)
jenkins_server = hook.get_jenkins_server()
# The JenkinsJobTriggerOperator store the job url in the xcom variable
corresponding to the task
# You can then use it to access things or to get the job number
@@ -61,11 +61,7 @@ with DAG(
response = jenkins_server.jenkins_open(request)
return response # We store the artifact content in a xcom variable
for later use
- artifact_grabber = PythonOperator(
- task_id='artifact_grabber',
- python_callable=grab_artifact_from_jenkins,
- op_kwargs=dict(url=job_trigger.output),
- )
+ grab_artifact_from_jenkins(job_trigger.output)
# Task dependency created via `XComArgs`:
- # job_trigger >> artifact_grabber
+ # job_trigger >> grab_artifact_from_jenkins()
diff --git
a/airflow/providers/microsoft/azure/example_dags/example_fileshare.py
b/airflow/providers/microsoft/azure/example_dags/example_fileshare.py
index e3263d6..9fbb0a9 100644
--- a/airflow/providers/microsoft/azure/example_dags/example_fileshare.py
+++ b/airflow/providers/microsoft/azure/example_dags/example_fileshare.py
@@ -15,8 +15,8 @@
# specific language governing permissions and limitations
# under the License.
+from airflow.decorators import task
from airflow.models import DAG
-from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.azure.hooks.azure_fileshare import
AzureFileShareHook
from airflow.utils.dates import days_ago
@@ -24,6 +24,7 @@ NAME = 'myfileshare'
DIRECTORY = "mydirectory"
+@task
def create_fileshare():
"""Create a fileshare with directory"""
hook = AzureFileShareHook()
@@ -34,6 +35,7 @@ def create_fileshare():
raise Exception
+@task
def delete_fileshare():
"""Delete a fileshare"""
hook = AzureFileShareHook()
@@ -41,7 +43,4 @@ def delete_fileshare():
with DAG("example_fileshare", schedule_interval="@once",
start_date=days_ago(2)) as dag:
- create = PythonOperator(task_id="create-share",
python_callable=create_fileshare)
- delete = PythonOperator(task_id="delete-share",
python_callable=delete_fileshare)
-
- create >> delete
+ create_fileshare() >> delete_fileshare()
diff --git a/airflow/providers/papermill/example_dags/example_papermill.py
b/airflow/providers/papermill/example_dags/example_papermill.py
index 3ade5e2..1adb0e8 100644
--- a/airflow/providers/papermill/example_dags/example_papermill.py
+++ b/airflow/providers/papermill/example_dags/example_papermill.py
@@ -26,8 +26,8 @@ from datetime import timedelta
import scrapbook as sb
from airflow import DAG
+from airflow.decorators import task
from airflow.lineage import AUTO
-from airflow.operators.python import PythonOperator
from airflow.providers.papermill.operators.papermill import PapermillOperator
from airflow.utils.dates import days_ago
@@ -48,6 +48,7 @@ with DAG(
# [END howto_operator_papermill]
+@task
def check_notebook(inlets, execution_date):
"""
Verify the message in the notebook
@@ -76,6 +77,4 @@ with DAG(
parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
)
- check_output = PythonOperator(task_id='check_out',
python_callable=check_notebook, inlets=AUTO)
-
- check_output.set_upstream(run_this)
+ run_this >> check_notebook(inlets=AUTO, execution_date="{{ execution_date
}}")
diff --git a/airflow/providers/qubole/example_dags/example_qubole.py
b/airflow/providers/qubole/example_dags/example_qubole.py
index 9162753..3b3fc86 100644
--- a/airflow/providers/qubole/example_dags/example_qubole.py
+++ b/airflow/providers/qubole/example_dags/example_qubole.py
@@ -21,23 +21,15 @@ import random
import textwrap
from airflow import DAG
+from airflow.decorators import task
from airflow.operators.dummy import DummyOperator
-from airflow.operators.python import BranchPythonOperator, PythonOperator
+from airflow.operators.python import BranchPythonOperator
from airflow.providers.qubole.operators.qubole import QuboleOperator
from airflow.providers.qubole.sensors.qubole import QuboleFileSensor,
QubolePartitionSensor
from airflow.utils.dates import days_ago
-default_args = {
- 'owner': 'airflow',
- 'depends_on_past': False,
- 'email': ['[email protected]'],
- 'email_on_failure': False,
- 'email_on_retry': False,
-}
-
with DAG(
dag_id='example_qubole_operator',
- default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=['example'],
@@ -55,12 +47,17 @@ with DAG(
"""
)
- def compare_result_fn(ti):
+ @task(trigger_rule='all_done')
+ def compare_result(hive_show_table, hive_s3_location, ti=None):
"""
Compares the results of two QuboleOperator tasks.
- :param kwargs: The context of the executed task.
- :type kwargs: dict
+ :param hive_show_table: The "hive_show_table" task.
+ :type hive_show_table: QuboleOperator
+ :param hive_s3_location: The "hive_s3_location" task.
+ :type hive_s3_location: QuboleOperator
+ :param ti: The TaskInstance object.
+ :type ti: airflow.models.TaskInstance
:return: True if the files are the same, False otherwise.
:rtype: bool
"""
@@ -93,20 +90,13 @@ with DAG(
tags=['tag1', 'tag2'],
# If the script at s3 location has any qubole specific macros to be
replaced
# macros='[{"date": "{{ ds }}"}, {"name" : "abc"}]',
- trigger_rule="all_done",
- )
-
- compare_result = PythonOperator(
- task_id='compare_result', python_callable=compare_result_fn,
trigger_rule="all_done"
)
- compare_result << [hive_show_table, hive_s3_location]
-
options = ['hadoop_jar_cmd', 'presto_cmd', 'db_query', 'spark_cmd']
branching = BranchPythonOperator(task_id='branching',
python_callable=lambda: random.choice(options))
- branching << compare_result
+ [hive_show_table, hive_s3_location] >> compare_result(hive_s3_location,
hive_show_table) >> branching
join = DummyOperator(task_id='join', trigger_rule='one_success')
@@ -131,11 +121,9 @@ with DAG(
command_type="pigcmd",
script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
parameters="key1=value1 key2=value2",
- trigger_rule="all_done",
)
- pig_cmd << hadoop_jar_cmd << branching
- pig_cmd >> join
+ branching >> hadoop_jar_cmd >> pig_cmd >> join
presto_cmd = QuboleOperator(task_id='presto_cmd',
command_type='prestocmd', query='show tables')
@@ -144,11 +132,9 @@ with DAG(
command_type="shellcmd",
script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
parameters="param1 param2",
- trigger_rule="all_done",
)
- shell_cmd << presto_cmd << branching
- shell_cmd >> join
+ branching >> presto_cmd >> shell_cmd >> join
db_query = QuboleOperator(
task_id='db_query', command_type='dbtapquerycmd', query='show tables',
db_tap_id=2064
@@ -162,11 +148,9 @@ with DAG(
db_table='exported_airline_origin_destination',
partition_spec='dt=20110104-02',
dbtap_id=2064,
- trigger_rule="all_done",
)
- db_export << db_query << branching
- db_export >> join
+ branching >> db_query >> db_export >> join
db_import = QuboleOperator(
task_id='db_import',
@@ -177,7 +161,6 @@ with DAG(
where_clause='id < 10',
parallelism=2,
dbtap_id=2064,
- trigger_rule="all_done",
)
prog = '''
@@ -213,12 +196,10 @@ with DAG(
tags='airflow_example_run',
)
- spark_cmd << db_import << branching
- spark_cmd >> join
+ branching >> db_import >> spark_cmd >> join
with DAG(
dag_id='example_qubole_sensor',
- default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
doc_md=__doc__,
diff --git a/airflow/providers/sqlite/example_dags/create_table.sql
b/airflow/providers/sqlite/example_dags/create_table.sql
new file mode 100644
index 0000000..0468338
--- /dev/null
+++ b/airflow/providers/sqlite/example_dags/create_table.sql
@@ -0,0 +1,24 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+
+CREATE TABLE Items (
+ item_id INT PRIMARY KEY,
+ name TEXT,
+ description TEXT
+)
diff --git a/airflow/providers/sqlite/example_dags/example_sqlite.py
b/airflow/providers/sqlite/example_dags/example_sqlite.py
index 2ff5114..5799871 100644
--- a/airflow/providers/sqlite/example_dags/example_sqlite.py
+++ b/airflow/providers/sqlite/example_dags/example_sqlite.py
@@ -24,7 +24,6 @@ The second task is similar but instead calls the SQL command
from an external fi
"""
from airflow import DAG
-from airflow.operators.python import PythonOperator
from airflow.providers.sqlite.hooks.sqlite import SqliteHook
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.utils.dates import days_ago
@@ -41,12 +40,11 @@ dag = DAG(
# Example of creating a task that calls a common CREATE TABLE sql command.
create_table_sqlite_task = SqliteOperator(
task_id='create_table_sqlite',
- sqlite_conn_id='sqlite_conn_id',
sql=r"""
- CREATE TABLE table_name (
- column_1 string,
- column_2 string,
- column_3 string
+ CREATE TABLE Customers (
+ customer_id INT PRIMARY KEY,
+ first_name TEXT,
+ last_name TEXT
);
""",
dag=dag,
@@ -55,37 +53,32 @@ create_table_sqlite_task = SqliteOperator(
# [END howto_operator_sqlite]
[email protected](task_id="insert_sqlite_task")
def insert_sqlite_hook():
sqlite_hook = SqliteHook("sqlite_default")
- sqlite_hook.get_conn()
rows = [('James', '11'), ('James', '22'), ('James', '33')]
target_fields = ['first_name', 'last_name']
- sqlite_hook.insert_rows(table='Customer', rows=rows,
target_fields=target_fields)
+ sqlite_hook.insert_rows(table='Customers', rows=rows,
target_fields=target_fields)
[email protected](task_id="replace_sqlite_task")
def replace_sqlite_hook():
sqlite_hook = SqliteHook("sqlite_default")
- sqlite_hook.get_conn()
rows = [('James', '11'), ('James', '22'), ('James', '33')]
target_fields = ['first_name', 'last_name']
- sqlite_hook.insert_rows(table='Customer', rows=rows,
target_fields=target_fields, replace=True)
+ sqlite_hook.insert_rows(table='Customers', rows=rows,
target_fields=target_fields, replace=True)
-insert_sqlite_task = PythonOperator(task_id="insert_sqlite_task",
python_callable=insert_sqlite_hook)
-replace_sqlite_task = PythonOperator(task_id="replace_sqlite_task",
python_callable=replace_sqlite_hook)
-
# [START howto_operator_sqlite_external_file]
# Example of creating a task that calls an sql command from an external file.
external_create_table_sqlite_task = SqliteOperator(
task_id='create_table_sqlite_external_file',
- sqlite_conn_id='sqlite_conn_id',
- sql='/scripts/create_table.sql',
- dag=dag,
+ sql='create_table.sql',
)
# [END howto_operator_sqlite_external_file]
-create_table_sqlite_task >> external_create_table_sqlite_task >>
insert_sqlite_task >> replace_sqlite_task
+create_table_sqlite_task >> external_create_table_sqlite_task >>
insert_sqlite_hook() >> replace_sqlite_hook()