This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a458fcc  Updating miscellaneous provider DAGs to use TaskFlow API 
where applicable (#18278)
a458fcc is described below

commit a458fcc573845ff65244a2dafd204ed70129f3e8
Author: Josh Fell <48934154+josh-f...@users.noreply.github.com>
AuthorDate: Mon Sep 27 12:19:23 2021 -0400

    Updating miscellaneous provider DAGs to use TaskFlow API where applicable 
(#18278)
---
 .../amazon/aws/example_dags/example_s3_bucket.py   |  11 +-
 .../aws/example_dags/example_s3_to_redshift.py     |  31 +++--
 .../hive/example_dags/example_twitter_README.md    |   2 +-
 .../hive/example_dags/example_twitter_dag.py       | 134 ++++++++-------------
 .../apache/kylin/example_dags/example_kylin_dag.py |  64 ++++------
 .../google/cloud/example_dags/example_s3_to_gcs.py |   7 +-
 .../example_dags/example_jenkins_job_trigger.py    |  22 ++--
 .../azure/example_dags/example_fileshare.py        |   9 +-
 .../papermill/example_dags/example_papermill.py    |   7 +-
 .../qubole/example_dags/example_qubole.py          |  49 +++-----
 .../providers/sqlite/example_dags/create_table.sql |  24 ++++
 .../sqlite/example_dags/example_sqlite.py          |  27 ++---
 12 files changed, 160 insertions(+), 227 deletions(-)

diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py 
b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
index ceeb4b2..ca226bc 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py
@@ -16,8 +16,8 @@
 # under the License.
 import os
 
+from airflow.decorators import task
 from airflow.models.dag import DAG
-from airflow.operators.python import PythonOperator
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.operators.s3_bucket import 
S3CreateBucketOperator, S3DeleteBucketOperator
 from airflow.utils.dates import days_ago
@@ -25,6 +25,7 @@ from airflow.utils.dates import days_ago
 BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345')
 
 
+@task(task_id="s3_bucket_dag_add_keys_to_bucket")
 def upload_keys():
     """This is a python callback to add keys into the s3 bucket"""
     # add keys to bucket
@@ -41,6 +42,7 @@ with DAG(
     dag_id='s3_bucket_dag',
     schedule_interval=None,
     start_date=days_ago(2),
+    default_args={"bucket_name": BUCKET_NAME},
     max_active_runs=1,
     tags=['example'],
 ) as dag:
@@ -48,17 +50,14 @@ with DAG(
     # [START howto_operator_s3_bucket]
     create_bucket = S3CreateBucketOperator(
         task_id='s3_bucket_dag_create',
-        bucket_name=BUCKET_NAME,
         region_name='us-east-1',
     )
 
-    add_keys_to_bucket = PythonOperator(
-        task_id="s3_bucket_dag_add_keys_to_bucket", python_callable=upload_keys
-    )
+    # Using a task-decorated function to add keys
+    add_keys_to_bucket = upload_keys()
 
     delete_bucket = S3DeleteBucketOperator(
         task_id='s3_bucket_dag_delete',
-        bucket_name=BUCKET_NAME,
         force_delete=True,
     )
     # [END howto_operator_s3_bucket]
diff --git 
a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py 
b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
index 1a7c911..9cec527 100644
--- a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
+++ b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py
@@ -22,7 +22,8 @@ This is an example dag for using `S3ToRedshiftOperator` to 
copy a S3 key into a
 from os import getenv
 
 from airflow import DAG
-from airflow.operators.python import PythonOperator
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.transfers.s3_to_redshift import 
S3ToRedshiftOperator
 from airflow.providers.postgres.operators.postgres import PostgresOperator
@@ -35,12 +36,14 @@ REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "test_table")
 # [END howto_operator_s3_to_redshift_env_variables]
 
 
-def _add_sample_data_to_s3():
+@task(task_id='setup__add_sample_data_to_s3')
+def add_sample_data_to_s3():
     s3_hook = S3Hook()
     s3_hook.load_string("0,Airflow", f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET, 
replace=True)
 
 
-def _remove_sample_data_from_s3():
+@task(task_id='teardown__remove_sample_data_from_s3')
+def remove_sample_data_from_s3():
     s3_hook = S3Hook()
     if s3_hook.check_for_key(f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET):
         s3_hook.delete_objects(S3_BUCKET, f'{S3_KEY}/{REDSHIFT_TABLE}')
@@ -49,9 +52,8 @@ def _remove_sample_data_from_s3():
 with DAG(
     dag_id="example_s3_to_redshift", start_date=days_ago(1), 
schedule_interval=None, tags=['example']
 ) as dag:
-    setup__task_add_sample_data_to_s3 = PythonOperator(
-        python_callable=_add_sample_data_to_s3, 
task_id='setup__add_sample_data_to_s3'
-    )
+    add_sample_data_to_s3 = add_sample_data_to_s3()
+
     setup__task_create_table = PostgresOperator(
         sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name 
varchar)',
         postgres_conn_id='redshift_default',
@@ -72,14 +74,11 @@ with DAG(
         postgres_conn_id='redshift_default',
         task_id='teardown__drop_table',
     )
-    teardown__task_remove_sample_data_from_s3 = PythonOperator(
-        python_callable=_remove_sample_data_from_s3, 
task_id='teardown__remove_sample_data_from_s3'
-    )
-    (
-        [setup__task_add_sample_data_to_s3, setup__task_create_table]
-        >> task_transfer_s3_to_redshift
-        >> [
-            teardown__task_drop_table,
-            teardown__task_remove_sample_data_from_s3,
-        ]
+
+    remove_sample_data_from_s3 = remove_sample_data_from_s3()
+
+    chain(
+        [add_sample_data_to_s3, setup__task_create_table],
+        task_transfer_s3_to_redshift,
+        [teardown__task_drop_table, remove_sample_data_from_s3],
     )
diff --git 
a/airflow/providers/apache/hive/example_dags/example_twitter_README.md 
b/airflow/providers/apache/hive/example_dags/example_twitter_README.md
index c22ca2c..8f88983 100644
--- a/airflow/providers/apache/hive/example_dags/example_twitter_README.md
+++ b/airflow/providers/apache/hive/example_dags/example_twitter_README.md
@@ -35,7 +35,7 @@
 ***Screenshot:***
 <img src="http://i.imgur.com/rRpSO12.png"; width="99%"/>
 
-***Example Structure:*** In this example dag, we are collecting tweets for 
four users account or twitter handle. Each twitter handle has two channels, 
incoming tweets and outgoing tweets. Hence, in this example, by running the 
fetch_tweet task, we should have eight output files. For better management, 
each of the eight output files should be saved with the yesterday's date (we 
are collecting tweets from yesterday), i.e. toTwitter_A_2016-03-21.csv. We are 
using three kind of operators: Py [...]
+***Example Structure:*** In this example dag, we are collecting tweets for 
four users account or twitter handle. Each twitter handle has two channels, 
incoming tweets and outgoing tweets. Hence, in this example, by running the 
fetch_tweet task, we should have eight output files. For better management, 
each of the eight output files should be saved with the yesterday's date (we 
are collecting tweets from yesterday), i.e. toTwitter_A_2016-03-21.csv. We are 
using two kinds of operators (Bas [...]
 
 The python functions here are just placeholders. In case you are interested to 
actually make this DAG fully functional, first start with filling out the 
scripts as separate files and importing them into the DAG with absolute or 
relative import. My approach was to store the retrieved data in memory using 
Pandas dataframe first, and then use the built in method to save the CSV file 
on hard-disk.
 The eight different CSV files are then put into eight different folders within 
HDFS. Each of the newly inserted files are then loaded into eight different 
external hive tables. Hive tables can be external or internal. In this case, we 
are inserting the data right into the table, and so we are making our tables 
internal. Each file is inserted into the respected Hive table named after the 
twitter channel, i.e. toTwitter_A or fromTwitter_A. It is also important to 
note that when we created  [...]
diff --git a/airflow/providers/apache/hive/example_dags/example_twitter_dag.py 
b/airflow/providers/apache/hive/example_dags/example_twitter_dag.py
index de67457..d501706 100644
--- a/airflow/providers/apache/hive/example_dags/example_twitter_dag.py
+++ b/airflow/providers/apache/hive/example_dags/example_twitter_dag.py
@@ -31,38 +31,42 @@ This is an example dag for managing twitter data.
 from datetime import date, timedelta
 
 from airflow import DAG
+from airflow.decorators import task
 from airflow.operators.bash import BashOperator
-from airflow.operators.python import PythonOperator
 from airflow.providers.apache.hive.operators.hive import HiveOperator
 from airflow.utils.dates import days_ago
 
-# 
--------------------------------------------------------------------------------
-# Create a few placeholder scripts. In practice these would be different python
-# script files, which are imported in this section with absolute or relative 
imports
-# 
--------------------------------------------------------------------------------
-
 
-def fetchtweets():
+@task
+def fetch_tweets():
     """
-    This is a placeholder for fetchtweets.
+    This task should call Twitter API and retrieve tweets from yesterday from 
and to for the four twitter
+    users (Twitter_A,..,Twitter_D) There should be eight csv output files 
generated by this task and naming
+    convention is direction(from or to)_twitterHandle_date.csv
     """
 
 
-def cleantweets():
+@task
+def clean_tweets():
     """
-    This is a placeholder for cleantweets.
+    This is a placeholder to clean the eight files. In this step you can get 
rid of or cherry pick columns
+    and different parts of the text.
     """
 
 
-def analyzetweets():
+@task
+def analyze_tweets():
     """
-    This is a placeholder for analyzetweets.
+    This is a placeholder to analyze the twitter data. Could simply be a 
sentiment analysis through algorithms
+    like bag of words or something more complicated. You can also take a look 
at Web Services to do such
+    tasks.
     """
 
 
-def transfertodb():
+@task
+def transfer_to_db():
     """
-    This is a placeholder for transfertodb.
+    This is a placeholder to extract summary from Hive data and store it to 
MySQL.
     """
 
 
@@ -70,53 +74,18 @@ with DAG(
     dag_id='example_twitter_dag',
     default_args={
         'owner': 'Ekhtiar',
-        'depends_on_past': False,
-        'email': ['airf...@example.com'],
-        'email_on_failure': False,
-        'email_on_retry': False,
         'retries': 1,
-        'retry_delay': timedelta(minutes=5),
     },
     schedule_interval="@daily",
     start_date=days_ago(5),
     tags=['example'],
 ) as dag:
+    fetch_tweets = fetch_tweets()
+    clean_tweets = clean_tweets()
+    analyze_tweets = analyze_tweets()
+    hive_to_mysql = transfer_to_db()
 
-    # 
--------------------------------------------------------------------------------
-    # This task should call Twitter API and retrieve tweets from yesterday 
from and to
-    # for the four twitter users (Twitter_A,..,Twitter_D) There should be 
eight csv
-    # output files generated by this task and naming convention
-    # is direction(from or to)_twitterHandle_date.csv
-    # 
--------------------------------------------------------------------------------
-
-    fetch_tweets = PythonOperator(task_id='fetch_tweets', 
python_callable=fetchtweets)
-
-    # 
--------------------------------------------------------------------------------
-    # Clean the eight files. In this step you can get rid of or cherry pick 
columns
-    # and different parts of the text
-    # 
--------------------------------------------------------------------------------
-
-    clean_tweets = PythonOperator(task_id='clean_tweets', 
python_callable=cleantweets)
-
-    clean_tweets << fetch_tweets
-
-    # 
--------------------------------------------------------------------------------
-    # In this section you can use a script to analyze the twitter data. Could 
simply
-    # be a sentiment analysis through algorithms like bag of words or 
something more
-    # complicated. You can also take a look at Web Services to do such tasks
-    # 
--------------------------------------------------------------------------------
-
-    analyze_tweets = PythonOperator(task_id='analyze_tweets', 
python_callable=analyzetweets)
-
-    analyze_tweets << clean_tweets
-
-    # 
--------------------------------------------------------------------------------
-    # Although this is the last task, we need to declare it before the next 
tasks as we
-    # will use set_downstream This task will extract summary from Hive data 
and store
-    # it to MySQL
-    # 
--------------------------------------------------------------------------------
-
-    hive_to_mysql = PythonOperator(task_id='hive_to_mysql', 
python_callable=transfertodb)
+    fetch_tweets >> clean_tweets >> analyze_tweets
 
     # 
--------------------------------------------------------------------------------
     # The following tasks are generated using for loop. The first task puts 
the eight
@@ -137,49 +106,42 @@ with DAG(
 
     for channel in to_channels:
 
-        file_name = "to_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + 
".csv"
+        file_name = f"to_{channel}_{dt}.csv"
 
         load_to_hdfs = BashOperator(
-            task_id="put_" + channel + "_to_hdfs",
-            bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f "
-            + local_dir
-            + file_name
-            + hdfs_dir
-            + channel
-            + "/",
+            task_id=f"put_{channel}_to_hdfs",
+            bash_command=(
+                f"HADOOP_USER_NAME=hdfs hadoop fs -put -f 
{local_dir}{file_name}{hdfs_dir}{channel}/"
+            ),
         )
 
-        load_to_hdfs << analyze_tweets
-
         load_to_hive = HiveOperator(
-            task_id="load_" + channel + "_to_hive",
-            hql="LOAD DATA INPATH '" + hdfs_dir + channel + "/" + file_name + 
"' "
-            "INTO TABLE " + channel + " "
-            "PARTITION(dt='" + dt + "')",
+            task_id=f"load_{channel}_to_hive",
+            hql=(
+                f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}'"
+                f"INTO TABLE {channel}"
+                f"PARTITION(dt='{dt}')"
+            ),
         )
-        load_to_hive << load_to_hdfs
-        load_to_hive >> hive_to_mysql
+
+        analyze_tweets >> load_to_hdfs >> load_to_hive >> hive_to_mysql
 
     for channel in from_channels:
-        file_name = "from_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + 
".csv"
+        file_name = f"from_{channel}_{dt}.csv"
         load_to_hdfs = BashOperator(
-            task_id="put_" + channel + "_to_hdfs",
-            bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f "
-            + local_dir
-            + file_name
-            + hdfs_dir
-            + channel
-            + "/",
+            task_id=f"put_{channel}_to_hdfs",
+            bash_command=(
+                f"HADOOP_USER_NAME=hdfs hadoop fs -put -f 
{local_dir}{file_name}{hdfs_dir}{channel}/"
+            ),
         )
 
-        load_to_hdfs << analyze_tweets
-
         load_to_hive = HiveOperator(
-            task_id="load_" + channel + "_to_hive",
-            hql="LOAD DATA INPATH '" + hdfs_dir + channel + "/" + file_name + 
"' "
-            "INTO TABLE " + channel + " "
-            "PARTITION(dt='" + dt + "')",
+            task_id=f"load_{channel}_to_hive",
+            hql=(
+                f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}' "
+                f"INTO TABLE {channel} "
+                f"PARTITION(dt='{dt}')"
+            ),
         )
 
-        load_to_hive << load_to_hdfs
-        load_to_hive >> hive_to_mysql
+        analyze_tweets >> load_to_hdfs >> load_to_hive >> hive_to_mysql
diff --git a/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py 
b/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
index 0e8becd..22ae853 100644
--- a/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
+++ b/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
@@ -21,7 +21,6 @@ This is an example DAG which uses the KylinCubeOperator.
 The tasks below include kylin build, refresh, merge operation.
 """
 from airflow import DAG
-from airflow.operators.python import PythonOperator
 from airflow.providers.apache.kylin.operators.kylin_cube import 
KylinCubeOperator
 from airflow.utils.dates import days_ago
 
@@ -29,42 +28,37 @@ dag = DAG(
     dag_id='example_kylin_operator',
     schedule_interval=None,
     start_date=days_ago(1),
+    default_args={'kylin_conn_id': 'kylin_default', 'project': 'learn_kylin', 
'cube': 'kylin_sales_cube'},
     tags=['example'],
 )
 
 
-def gen_build_time(**kwargs):
+@dag.task
+def gen_build_time():
     """
-    Gen build time and push to xcom
-    :param kwargs:
-    :return:
+    Gen build time and push to XCom (with key of "return_value")
+    :return: A dict with build time values.
     """
-    ti = kwargs['ti']
-    ti.xcom_push(key='date_start', value='1325347200000')
-    ti.xcom_push(key='date_end', value='1325433600000')
+    return {'date_start': '1325347200000', 'date_end': '1325433600000'}
 
 
-gen_build_time_task = PythonOperator(python_callable=gen_build_time, 
task_id='gen_build_time', dag=dag)
+gen_build_time = gen_build_time()
+gen_build_time_output_date_start = gen_build_time['date_start']
+gen_build_time_output_date_end = gen_build_time['date_end']
 
 build_task1 = KylinCubeOperator(
     task_id="kylin_build_1",
-    kylin_conn_id='kylin_default',
-    project='learn_kylin',
-    cube='kylin_sales_cube',
     command='build',
-    start_time=gen_build_time_task.output['date_start'],
-    end_time=gen_build_time_task.output['date_end'],
+    start_time=gen_build_time_output_date_start,
+    end_time=gen_build_time_output_date_end,
     is_track_job=True,
     dag=dag,
 )
 
 build_task2 = KylinCubeOperator(
     task_id="kylin_build_2",
-    kylin_conn_id='kylin_default',
-    project='learn_kylin',
-    cube='kylin_sales_cube',
     command='build',
-    start_time='1325433600000',
+    start_time=gen_build_time_output_date_end,
     end_time='1325520000000',
     is_track_job=True,
     dag=dag,
@@ -72,23 +66,17 @@ build_task2 = KylinCubeOperator(
 
 refresh_task1 = KylinCubeOperator(
     task_id="kylin_refresh_1",
-    kylin_conn_id='kylin_default',
-    project='learn_kylin',
-    cube='kylin_sales_cube',
     command='refresh',
-    start_time='1325347200000',
-    end_time='1325433600000',
+    start_time=gen_build_time_output_date_start,
+    end_time=gen_build_time_output_date_end,
     is_track_job=True,
     dag=dag,
 )
 
 merge_task = KylinCubeOperator(
     task_id="kylin_merge",
-    kylin_conn_id='kylin_default',
-    project='learn_kylin',
-    cube='kylin_sales_cube',
     command='merge',
-    start_time='1325347200000',
+    start_time=gen_build_time_output_date_start,
     end_time='1325520000000',
     is_track_job=True,
     dag=dag,
@@ -96,35 +84,29 @@ merge_task = KylinCubeOperator(
 
 disable_task = KylinCubeOperator(
     task_id="kylin_disable",
-    kylin_conn_id='kylin_default',
-    project='learn_kylin',
-    cube='kylin_sales_cube',
     command='disable',
     dag=dag,
 )
 
 purge_task = KylinCubeOperator(
     task_id="kylin_purge",
-    kylin_conn_id='kylin_default',
-    project='learn_kylin',
-    cube='kylin_sales_cube',
     command='purge',
     dag=dag,
 )
 
 build_task3 = KylinCubeOperator(
     task_id="kylin_build_3",
-    kylin_conn_id='kylin_default',
-    project='learn_kylin',
-    cube='kylin_sales_cube',
     command='build',
-    start_time='1325433600000',
-    end_time='1325520000000',
+    start_time=gen_build_time_output_date_end,
+    end_time='1328730000000',
     dag=dag,
 )
 
-build_task1 >> build_task2 >> refresh_task1 >> merge_task
-merge_task >> disable_task >> purge_task >> build_task3
+build_task1 >> build_task2 >> refresh_task1 >> merge_task >> disable_task >> 
purge_task >> build_task3
 
 # Task dependency created via `XComArgs`:
-#   gen_build_time_task >> build_task1
+#   gen_build_time >> build_task1
+#   gen_build_time >> build_task2
+#   gen_build_time >> refresh_task1
+#   gen_build_time >> merge_task
+#   gen_build_time >> build_task3
diff --git a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py 
b/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
index 118abfb..334aac5 100644
--- a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
+++ b/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py
@@ -18,7 +18,7 @@
 import os
 
 from airflow import models
-from airflow.operators.python import PythonOperator
+from airflow.decorators import task
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.amazon.aws.operators.s3_bucket import 
S3CreateBucketOperator, S3DeleteBucketOperator
 from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
@@ -32,6 +32,7 @@ UPLOAD_FILE = '/tmp/example-file.txt'
 PREFIX = 'TESTS'
 
 
+@task(task_id='upload_file_to_s3')
 def upload_file():
     """A callable to upload file to AWS bucket"""
     s3_hook = S3Hook()
@@ -48,8 +49,6 @@ with models.DAG(
         task_id="create_s3_bucket", bucket_name=S3BUCKET_NAME, 
region_name='us-east-1'
     )
 
-    upload_to_s3 = PythonOperator(task_id='upload_file_to_s3', 
python_callable=upload_file)
-
     create_gcs_bucket = GCSCreateBucketOperator(
         task_id="create_bucket",
         bucket_name=GCS_BUCKET,
@@ -69,7 +68,7 @@ with models.DAG(
 
     (
         create_s3_bucket
-        >> upload_to_s3
+        >> upload_file()
         >> create_gcs_bucket
         >> transfer_to_gcs
         >> delete_s3_bucket
diff --git 
a/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py 
b/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py
index 66f4803..ce2c8ac 100644
--- a/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py
+++ b/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py
@@ -15,22 +15,21 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from datetime import datetime, timedelta
+from datetime import datetime
 
 from requests import Request
 
 from airflow import DAG
-from airflow.operators.python import PythonOperator
+from airflow.decorators import task
 from airflow.providers.jenkins.hooks.jenkins import JenkinsHook
 from airflow.providers.jenkins.operators.jenkins_job_trigger import 
JenkinsJobTriggerOperator
 
+JENKINS_CONNECTION_ID = "your_jenkins_connection"
+
 with DAG(
     "test_jenkins",
     default_args={
-        "owner": "airflow",
         "retries": 1,
-        "retry_delay": timedelta(minutes=5),
-        "depends_on_past": False,
         "concurrency": 8,
         "max_active_runs": 8,
     },
@@ -42,16 +41,17 @@ with DAG(
         job_name="generate-merlin-config",
         parameters={"first_parameter": "a_value", "second_parameter": "18"},
         # parameters="resources/parameter.json", You can also pass a path to a 
json file containing your param
-        jenkins_connection_id="your_jenkins_connection",  # T he connection 
must be configured first
+        jenkins_connection_id=JENKINS_CONNECTION_ID,  # The connection must be 
configured first
     )
 
+    @task
     def grab_artifact_from_jenkins(url):
         """
         Grab an artifact from the previous job
         The python-jenkins library doesn't expose a method for that
         But it's totally possible to build manually the request for that
         """
-        hook = JenkinsHook("your_jenkins_connection")
+        hook = JenkinsHook(JENKINS_CONNECTION_ID)
         jenkins_server = hook.get_jenkins_server()
         # The JenkinsJobTriggerOperator store the job url in the xcom variable 
corresponding to the task
         # You can then use it to access things or to get the job number
@@ -61,11 +61,7 @@ with DAG(
         response = jenkins_server.jenkins_open(request)
         return response  # We store the artifact content in a xcom variable 
for later use
 
-    artifact_grabber = PythonOperator(
-        task_id='artifact_grabber',
-        python_callable=grab_artifact_from_jenkins,
-        op_kwargs=dict(url=job_trigger.output),
-    )
+    grab_artifact_from_jenkins(job_trigger.output)
 
     # Task dependency created via `XComArgs`:
-    #   job_trigger >> artifact_grabber
+    #   job_trigger >> grab_artifact_from_jenkins()
diff --git 
a/airflow/providers/microsoft/azure/example_dags/example_fileshare.py 
b/airflow/providers/microsoft/azure/example_dags/example_fileshare.py
index e3263d6..9fbb0a9 100644
--- a/airflow/providers/microsoft/azure/example_dags/example_fileshare.py
+++ b/airflow/providers/microsoft/azure/example_dags/example_fileshare.py
@@ -15,8 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from airflow.decorators import task
 from airflow.models import DAG
-from airflow.operators.python import PythonOperator
 from airflow.providers.microsoft.azure.hooks.azure_fileshare import 
AzureFileShareHook
 from airflow.utils.dates import days_ago
 
@@ -24,6 +24,7 @@ NAME = 'myfileshare'
 DIRECTORY = "mydirectory"
 
 
+@task
 def create_fileshare():
     """Create a fileshare with directory"""
     hook = AzureFileShareHook()
@@ -34,6 +35,7 @@ def create_fileshare():
         raise Exception
 
 
+@task
 def delete_fileshare():
     """Delete a fileshare"""
     hook = AzureFileShareHook()
@@ -41,7 +43,4 @@ def delete_fileshare():
 
 
 with DAG("example_fileshare", schedule_interval="@once", 
start_date=days_ago(2)) as dag:
-    create = PythonOperator(task_id="create-share", 
python_callable=create_fileshare)
-    delete = PythonOperator(task_id="delete-share", 
python_callable=delete_fileshare)
-
-    create >> delete
+    create_fileshare() >> delete_fileshare()
diff --git a/airflow/providers/papermill/example_dags/example_papermill.py 
b/airflow/providers/papermill/example_dags/example_papermill.py
index 3ade5e2..1adb0e8 100644
--- a/airflow/providers/papermill/example_dags/example_papermill.py
+++ b/airflow/providers/papermill/example_dags/example_papermill.py
@@ -26,8 +26,8 @@ from datetime import timedelta
 import scrapbook as sb
 
 from airflow import DAG
+from airflow.decorators import task
 from airflow.lineage import AUTO
-from airflow.operators.python import PythonOperator
 from airflow.providers.papermill.operators.papermill import PapermillOperator
 from airflow.utils.dates import days_ago
 
@@ -48,6 +48,7 @@ with DAG(
     # [END howto_operator_papermill]
 
 
+@task
 def check_notebook(inlets, execution_date):
     """
     Verify the message in the notebook
@@ -76,6 +77,4 @@ with DAG(
         parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"},
     )
 
-    check_output = PythonOperator(task_id='check_out', 
python_callable=check_notebook, inlets=AUTO)
-
-    check_output.set_upstream(run_this)
+    run_this >> check_notebook(inlets=AUTO, execution_date="{{ execution_date 
}}")
diff --git a/airflow/providers/qubole/example_dags/example_qubole.py 
b/airflow/providers/qubole/example_dags/example_qubole.py
index 9162753..3b3fc86 100644
--- a/airflow/providers/qubole/example_dags/example_qubole.py
+++ b/airflow/providers/qubole/example_dags/example_qubole.py
@@ -21,23 +21,15 @@ import random
 import textwrap
 
 from airflow import DAG
+from airflow.decorators import task
 from airflow.operators.dummy import DummyOperator
-from airflow.operators.python import BranchPythonOperator, PythonOperator
+from airflow.operators.python import BranchPythonOperator
 from airflow.providers.qubole.operators.qubole import QuboleOperator
 from airflow.providers.qubole.sensors.qubole import QuboleFileSensor, 
QubolePartitionSensor
 from airflow.utils.dates import days_ago
 
-default_args = {
-    'owner': 'airflow',
-    'depends_on_past': False,
-    'email': ['airf...@example.com'],
-    'email_on_failure': False,
-    'email_on_retry': False,
-}
-
 with DAG(
     dag_id='example_qubole_operator',
-    default_args=default_args,
     schedule_interval=None,
     start_date=days_ago(2),
     tags=['example'],
@@ -55,12 +47,17 @@ with DAG(
         """
     )
 
-    def compare_result_fn(ti):
+    @task(trigger_rule='all_done')
+    def compare_result(hive_show_table, hive_s3_location, ti=None):
         """
         Compares the results of two QuboleOperator tasks.
 
-        :param kwargs: The context of the executed task.
-        :type kwargs: dict
+        :param hive_show_table: The "hive_show_table" task.
+        :type hive_show_table: QuboleOperator
+        :param hive_s3_location: The "hive_s3_location" task.
+        :type hive_s3_location: QuboleOperator
+        :param ti: The TaskInstance object.
+        :type ti: airflow.models.TaskInstance
         :return: True if the files are the same, False otherwise.
         :rtype: bool
         """
@@ -93,20 +90,13 @@ with DAG(
         tags=['tag1', 'tag2'],
         # If the script at s3 location has any qubole specific macros to be 
replaced
         # macros='[{"date": "{{ ds }}"}, {"name" : "abc"}]',
-        trigger_rule="all_done",
-    )
-
-    compare_result = PythonOperator(
-        task_id='compare_result', python_callable=compare_result_fn, 
trigger_rule="all_done"
     )
 
-    compare_result << [hive_show_table, hive_s3_location]
-
     options = ['hadoop_jar_cmd', 'presto_cmd', 'db_query', 'spark_cmd']
 
     branching = BranchPythonOperator(task_id='branching', 
python_callable=lambda: random.choice(options))
 
-    branching << compare_result
+    [hive_show_table, hive_s3_location] >> compare_result(hive_s3_location, 
hive_show_table) >> branching
 
     join = DummyOperator(task_id='join', trigger_rule='one_success')
 
@@ -131,11 +121,9 @@ with DAG(
         command_type="pigcmd",
         
script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
         parameters="key1=value1 key2=value2",
-        trigger_rule="all_done",
     )
 
-    pig_cmd << hadoop_jar_cmd << branching
-    pig_cmd >> join
+    branching >> hadoop_jar_cmd >> pig_cmd >> join
 
     presto_cmd = QuboleOperator(task_id='presto_cmd', 
command_type='prestocmd', query='show tables')
 
@@ -144,11 +132,9 @@ with DAG(
         command_type="shellcmd",
         script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
         parameters="param1 param2",
-        trigger_rule="all_done",
     )
 
-    shell_cmd << presto_cmd << branching
-    shell_cmd >> join
+    branching >> presto_cmd >> shell_cmd >> join
 
     db_query = QuboleOperator(
         task_id='db_query', command_type='dbtapquerycmd', query='show tables', 
db_tap_id=2064
@@ -162,11 +148,9 @@ with DAG(
         db_table='exported_airline_origin_destination',
         partition_spec='dt=20110104-02',
         dbtap_id=2064,
-        trigger_rule="all_done",
     )
 
-    db_export << db_query << branching
-    db_export >> join
+    branching >> db_query >> db_export >> join
 
     db_import = QuboleOperator(
         task_id='db_import',
@@ -177,7 +161,6 @@ with DAG(
         where_clause='id < 10',
         parallelism=2,
         dbtap_id=2064,
-        trigger_rule="all_done",
     )
 
     prog = '''
@@ -213,12 +196,10 @@ with DAG(
         tags='airflow_example_run',
     )
 
-    spark_cmd << db_import << branching
-    spark_cmd >> join
+    branching >> db_import >> spark_cmd >> join
 
 with DAG(
     dag_id='example_qubole_sensor',
-    default_args=default_args,
     schedule_interval=None,
     start_date=days_ago(2),
     doc_md=__doc__,
diff --git a/airflow/providers/sqlite/example_dags/create_table.sql 
b/airflow/providers/sqlite/example_dags/create_table.sql
new file mode 100644
index 0000000..0468338
--- /dev/null
+++ b/airflow/providers/sqlite/example_dags/create_table.sql
@@ -0,0 +1,24 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+*/
+
+CREATE TABLE Items (
+    item_id INT PRIMARY KEY,
+    name TEXT,
+    description TEXT
+)
diff --git a/airflow/providers/sqlite/example_dags/example_sqlite.py 
b/airflow/providers/sqlite/example_dags/example_sqlite.py
index 2ff5114..5799871 100644
--- a/airflow/providers/sqlite/example_dags/example_sqlite.py
+++ b/airflow/providers/sqlite/example_dags/example_sqlite.py
@@ -24,7 +24,6 @@ The second task is similar but instead calls the SQL command 
from an external fi
 """
 
 from airflow import DAG
-from airflow.operators.python import PythonOperator
 from airflow.providers.sqlite.hooks.sqlite import SqliteHook
 from airflow.providers.sqlite.operators.sqlite import SqliteOperator
 from airflow.utils.dates import days_ago
@@ -41,12 +40,11 @@ dag = DAG(
 # Example of creating a task that calls a common CREATE TABLE sql command.
 create_table_sqlite_task = SqliteOperator(
     task_id='create_table_sqlite',
-    sqlite_conn_id='sqlite_conn_id',
     sql=r"""
-    CREATE TABLE table_name (
-        column_1 string,
-        column_2 string,
-        column_3 string
+    CREATE TABLE Customers (
+        customer_id INT PRIMARY KEY,
+        first_name TEXT,
+        last_name TEXT
     );
     """,
     dag=dag,
@@ -55,37 +53,32 @@ create_table_sqlite_task = SqliteOperator(
 # [END howto_operator_sqlite]
 
 
+@dag.task(task_id="insert_sqlite_task")
 def insert_sqlite_hook():
     sqlite_hook = SqliteHook("sqlite_default")
-    sqlite_hook.get_conn()
 
     rows = [('James', '11'), ('James', '22'), ('James', '33')]
     target_fields = ['first_name', 'last_name']
-    sqlite_hook.insert_rows(table='Customer', rows=rows, 
target_fields=target_fields)
+    sqlite_hook.insert_rows(table='Customers', rows=rows, 
target_fields=target_fields)
 
 
+@dag.task(task_id="replace_sqlite_task")
 def replace_sqlite_hook():
     sqlite_hook = SqliteHook("sqlite_default")
-    sqlite_hook.get_conn()
 
     rows = [('James', '11'), ('James', '22'), ('James', '33')]
     target_fields = ['first_name', 'last_name']
-    sqlite_hook.insert_rows(table='Customer', rows=rows, 
target_fields=target_fields, replace=True)
+    sqlite_hook.insert_rows(table='Customers', rows=rows, 
target_fields=target_fields, replace=True)
 
 
-insert_sqlite_task = PythonOperator(task_id="insert_sqlite_task", 
python_callable=insert_sqlite_hook)
-replace_sqlite_task = PythonOperator(task_id="replace_sqlite_task", 
python_callable=replace_sqlite_hook)
-
 # [START howto_operator_sqlite_external_file]
 
 # Example of creating a task that calls an sql command from an external file.
 external_create_table_sqlite_task = SqliteOperator(
     task_id='create_table_sqlite_external_file',
-    sqlite_conn_id='sqlite_conn_id',
-    sql='/scripts/create_table.sql',
-    dag=dag,
+    sql='create_table.sql',
 )
 
 # [END howto_operator_sqlite_external_file]
 
-create_table_sqlite_task >> external_create_table_sqlite_task >> 
insert_sqlite_task >> replace_sqlite_task
+create_table_sqlite_task >> external_create_table_sqlite_task >> 
insert_sqlite_hook() >> replace_sqlite_hook()

Reply via email to