This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new a458fcc Updating miscellaneous provider DAGs to use TaskFlow API where applicable (#18278) a458fcc is described below commit a458fcc573845ff65244a2dafd204ed70129f3e8 Author: Josh Fell <48934154+josh-f...@users.noreply.github.com> AuthorDate: Mon Sep 27 12:19:23 2021 -0400 Updating miscellaneous provider DAGs to use TaskFlow API where applicable (#18278) --- .../amazon/aws/example_dags/example_s3_bucket.py | 11 +- .../aws/example_dags/example_s3_to_redshift.py | 31 +++-- .../hive/example_dags/example_twitter_README.md | 2 +- .../hive/example_dags/example_twitter_dag.py | 134 ++++++++------------- .../apache/kylin/example_dags/example_kylin_dag.py | 64 ++++------ .../google/cloud/example_dags/example_s3_to_gcs.py | 7 +- .../example_dags/example_jenkins_job_trigger.py | 22 ++-- .../azure/example_dags/example_fileshare.py | 9 +- .../papermill/example_dags/example_papermill.py | 7 +- .../qubole/example_dags/example_qubole.py | 49 +++----- .../providers/sqlite/example_dags/create_table.sql | 24 ++++ .../sqlite/example_dags/example_sqlite.py | 27 ++--- 12 files changed, 160 insertions(+), 227 deletions(-) diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py index ceeb4b2..ca226bc 100644 --- a/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py +++ b/airflow/providers/amazon/aws/example_dags/example_s3_bucket.py @@ -16,8 +16,8 @@ # under the License. import os +from airflow.decorators import task from airflow.models.dag import DAG -from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator from airflow.utils.dates import days_ago @@ -25,6 +25,7 @@ from airflow.utils.dates import days_ago BUCKET_NAME = os.environ.get('BUCKET_NAME', 'test-airflow-12345') +@task(task_id="s3_bucket_dag_add_keys_to_bucket") def upload_keys(): """This is a python callback to add keys into the s3 bucket""" # add keys to bucket @@ -41,6 +42,7 @@ with DAG( dag_id='s3_bucket_dag', schedule_interval=None, start_date=days_ago(2), + default_args={"bucket_name": BUCKET_NAME}, max_active_runs=1, tags=['example'], ) as dag: @@ -48,17 +50,14 @@ with DAG( # [START howto_operator_s3_bucket] create_bucket = S3CreateBucketOperator( task_id='s3_bucket_dag_create', - bucket_name=BUCKET_NAME, region_name='us-east-1', ) - add_keys_to_bucket = PythonOperator( - task_id="s3_bucket_dag_add_keys_to_bucket", python_callable=upload_keys - ) + # Using a task-decorated function to add keys + add_keys_to_bucket = upload_keys() delete_bucket = S3DeleteBucketOperator( task_id='s3_bucket_dag_delete', - bucket_name=BUCKET_NAME, force_delete=True, ) # [END howto_operator_s3_bucket] diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py index 1a7c911..9cec527 100644 --- a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py +++ b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py @@ -22,7 +22,8 @@ This is an example dag for using `S3ToRedshiftOperator` to copy a S3 key into a from os import getenv from airflow import DAG -from airflow.operators.python import PythonOperator +from airflow.decorators import task +from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator from airflow.providers.postgres.operators.postgres import PostgresOperator @@ -35,12 +36,14 @@ REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "test_table") # [END howto_operator_s3_to_redshift_env_variables] -def _add_sample_data_to_s3(): +@task(task_id='setup__add_sample_data_to_s3') +def add_sample_data_to_s3(): s3_hook = S3Hook() s3_hook.load_string("0,Airflow", f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET, replace=True) -def _remove_sample_data_from_s3(): +@task(task_id='teardown__remove_sample_data_from_s3') +def remove_sample_data_from_s3(): s3_hook = S3Hook() if s3_hook.check_for_key(f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET): s3_hook.delete_objects(S3_BUCKET, f'{S3_KEY}/{REDSHIFT_TABLE}') @@ -49,9 +52,8 @@ def _remove_sample_data_from_s3(): with DAG( dag_id="example_s3_to_redshift", start_date=days_ago(1), schedule_interval=None, tags=['example'] ) as dag: - setup__task_add_sample_data_to_s3 = PythonOperator( - python_callable=_add_sample_data_to_s3, task_id='setup__add_sample_data_to_s3' - ) + add_sample_data_to_s3 = add_sample_data_to_s3() + setup__task_create_table = PostgresOperator( sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name varchar)', postgres_conn_id='redshift_default', @@ -72,14 +74,11 @@ with DAG( postgres_conn_id='redshift_default', task_id='teardown__drop_table', ) - teardown__task_remove_sample_data_from_s3 = PythonOperator( - python_callable=_remove_sample_data_from_s3, task_id='teardown__remove_sample_data_from_s3' - ) - ( - [setup__task_add_sample_data_to_s3, setup__task_create_table] - >> task_transfer_s3_to_redshift - >> [ - teardown__task_drop_table, - teardown__task_remove_sample_data_from_s3, - ] + + remove_sample_data_from_s3 = remove_sample_data_from_s3() + + chain( + [add_sample_data_to_s3, setup__task_create_table], + task_transfer_s3_to_redshift, + [teardown__task_drop_table, remove_sample_data_from_s3], ) diff --git a/airflow/providers/apache/hive/example_dags/example_twitter_README.md b/airflow/providers/apache/hive/example_dags/example_twitter_README.md index c22ca2c..8f88983 100644 --- a/airflow/providers/apache/hive/example_dags/example_twitter_README.md +++ b/airflow/providers/apache/hive/example_dags/example_twitter_README.md @@ -35,7 +35,7 @@ ***Screenshot:*** <img src="http://i.imgur.com/rRpSO12.png" width="99%"/> -***Example Structure:*** In this example dag, we are collecting tweets for four users account or twitter handle. Each twitter handle has two channels, incoming tweets and outgoing tweets. Hence, in this example, by running the fetch_tweet task, we should have eight output files. For better management, each of the eight output files should be saved with the yesterday's date (we are collecting tweets from yesterday), i.e. toTwitter_A_2016-03-21.csv. We are using three kind of operators: Py [...] +***Example Structure:*** In this example dag, we are collecting tweets for four users account or twitter handle. Each twitter handle has two channels, incoming tweets and outgoing tweets. Hence, in this example, by running the fetch_tweet task, we should have eight output files. For better management, each of the eight output files should be saved with the yesterday's date (we are collecting tweets from yesterday), i.e. toTwitter_A_2016-03-21.csv. We are using two kinds of operators (Bas [...] The python functions here are just placeholders. In case you are interested to actually make this DAG fully functional, first start with filling out the scripts as separate files and importing them into the DAG with absolute or relative import. My approach was to store the retrieved data in memory using Pandas dataframe first, and then use the built in method to save the CSV file on hard-disk. The eight different CSV files are then put into eight different folders within HDFS. Each of the newly inserted files are then loaded into eight different external hive tables. Hive tables can be external or internal. In this case, we are inserting the data right into the table, and so we are making our tables internal. Each file is inserted into the respected Hive table named after the twitter channel, i.e. toTwitter_A or fromTwitter_A. It is also important to note that when we created [...] diff --git a/airflow/providers/apache/hive/example_dags/example_twitter_dag.py b/airflow/providers/apache/hive/example_dags/example_twitter_dag.py index de67457..d501706 100644 --- a/airflow/providers/apache/hive/example_dags/example_twitter_dag.py +++ b/airflow/providers/apache/hive/example_dags/example_twitter_dag.py @@ -31,38 +31,42 @@ This is an example dag for managing twitter data. from datetime import date, timedelta from airflow import DAG +from airflow.decorators import task from airflow.operators.bash import BashOperator -from airflow.operators.python import PythonOperator from airflow.providers.apache.hive.operators.hive import HiveOperator from airflow.utils.dates import days_ago -# -------------------------------------------------------------------------------- -# Create a few placeholder scripts. In practice these would be different python -# script files, which are imported in this section with absolute or relative imports -# -------------------------------------------------------------------------------- - -def fetchtweets(): +@task +def fetch_tweets(): """ - This is a placeholder for fetchtweets. + This task should call Twitter API and retrieve tweets from yesterday from and to for the four twitter + users (Twitter_A,..,Twitter_D) There should be eight csv output files generated by this task and naming + convention is direction(from or to)_twitterHandle_date.csv """ -def cleantweets(): +@task +def clean_tweets(): """ - This is a placeholder for cleantweets. + This is a placeholder to clean the eight files. In this step you can get rid of or cherry pick columns + and different parts of the text. """ -def analyzetweets(): +@task +def analyze_tweets(): """ - This is a placeholder for analyzetweets. + This is a placeholder to analyze the twitter data. Could simply be a sentiment analysis through algorithms + like bag of words or something more complicated. You can also take a look at Web Services to do such + tasks. """ -def transfertodb(): +@task +def transfer_to_db(): """ - This is a placeholder for transfertodb. + This is a placeholder to extract summary from Hive data and store it to MySQL. """ @@ -70,53 +74,18 @@ with DAG( dag_id='example_twitter_dag', default_args={ 'owner': 'Ekhtiar', - 'depends_on_past': False, - 'email': ['airf...@example.com'], - 'email_on_failure': False, - 'email_on_retry': False, 'retries': 1, - 'retry_delay': timedelta(minutes=5), }, schedule_interval="@daily", start_date=days_ago(5), tags=['example'], ) as dag: + fetch_tweets = fetch_tweets() + clean_tweets = clean_tweets() + analyze_tweets = analyze_tweets() + hive_to_mysql = transfer_to_db() - # -------------------------------------------------------------------------------- - # This task should call Twitter API and retrieve tweets from yesterday from and to - # for the four twitter users (Twitter_A,..,Twitter_D) There should be eight csv - # output files generated by this task and naming convention - # is direction(from or to)_twitterHandle_date.csv - # -------------------------------------------------------------------------------- - - fetch_tweets = PythonOperator(task_id='fetch_tweets', python_callable=fetchtweets) - - # -------------------------------------------------------------------------------- - # Clean the eight files. In this step you can get rid of or cherry pick columns - # and different parts of the text - # -------------------------------------------------------------------------------- - - clean_tweets = PythonOperator(task_id='clean_tweets', python_callable=cleantweets) - - clean_tweets << fetch_tweets - - # -------------------------------------------------------------------------------- - # In this section you can use a script to analyze the twitter data. Could simply - # be a sentiment analysis through algorithms like bag of words or something more - # complicated. You can also take a look at Web Services to do such tasks - # -------------------------------------------------------------------------------- - - analyze_tweets = PythonOperator(task_id='analyze_tweets', python_callable=analyzetweets) - - analyze_tweets << clean_tweets - - # -------------------------------------------------------------------------------- - # Although this is the last task, we need to declare it before the next tasks as we - # will use set_downstream This task will extract summary from Hive data and store - # it to MySQL - # -------------------------------------------------------------------------------- - - hive_to_mysql = PythonOperator(task_id='hive_to_mysql', python_callable=transfertodb) + fetch_tweets >> clean_tweets >> analyze_tweets # -------------------------------------------------------------------------------- # The following tasks are generated using for loop. The first task puts the eight @@ -137,49 +106,42 @@ with DAG( for channel in to_channels: - file_name = "to_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv" + file_name = f"to_{channel}_{dt}.csv" load_to_hdfs = BashOperator( - task_id="put_" + channel + "_to_hdfs", - bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f " - + local_dir - + file_name - + hdfs_dir - + channel - + "/", + task_id=f"put_{channel}_to_hdfs", + bash_command=( + f"HADOOP_USER_NAME=hdfs hadoop fs -put -f {local_dir}{file_name}{hdfs_dir}{channel}/" + ), ) - load_to_hdfs << analyze_tweets - load_to_hive = HiveOperator( - task_id="load_" + channel + "_to_hive", - hql="LOAD DATA INPATH '" + hdfs_dir + channel + "/" + file_name + "' " - "INTO TABLE " + channel + " " - "PARTITION(dt='" + dt + "')", + task_id=f"load_{channel}_to_hive", + hql=( + f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}'" + f"INTO TABLE {channel}" + f"PARTITION(dt='{dt}')" + ), ) - load_to_hive << load_to_hdfs - load_to_hive >> hive_to_mysql + + analyze_tweets >> load_to_hdfs >> load_to_hive >> hive_to_mysql for channel in from_channels: - file_name = "from_" + channel + "_" + yesterday.strftime("%Y-%m-%d") + ".csv" + file_name = f"from_{channel}_{dt}.csv" load_to_hdfs = BashOperator( - task_id="put_" + channel + "_to_hdfs", - bash_command="HADOOP_USER_NAME=hdfs hadoop fs -put -f " - + local_dir - + file_name - + hdfs_dir - + channel - + "/", + task_id=f"put_{channel}_to_hdfs", + bash_command=( + f"HADOOP_USER_NAME=hdfs hadoop fs -put -f {local_dir}{file_name}{hdfs_dir}{channel}/" + ), ) - load_to_hdfs << analyze_tweets - load_to_hive = HiveOperator( - task_id="load_" + channel + "_to_hive", - hql="LOAD DATA INPATH '" + hdfs_dir + channel + "/" + file_name + "' " - "INTO TABLE " + channel + " " - "PARTITION(dt='" + dt + "')", + task_id=f"load_{channel}_to_hive", + hql=( + f"LOAD DATA INPATH '{hdfs_dir}{channel}/{file_name}' " + f"INTO TABLE {channel} " + f"PARTITION(dt='{dt}')" + ), ) - load_to_hive << load_to_hdfs - load_to_hive >> hive_to_mysql + analyze_tweets >> load_to_hdfs >> load_to_hive >> hive_to_mysql diff --git a/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py b/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py index 0e8becd..22ae853 100644 --- a/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py +++ b/airflow/providers/apache/kylin/example_dags/example_kylin_dag.py @@ -21,7 +21,6 @@ This is an example DAG which uses the KylinCubeOperator. The tasks below include kylin build, refresh, merge operation. """ from airflow import DAG -from airflow.operators.python import PythonOperator from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator from airflow.utils.dates import days_ago @@ -29,42 +28,37 @@ dag = DAG( dag_id='example_kylin_operator', schedule_interval=None, start_date=days_ago(1), + default_args={'kylin_conn_id': 'kylin_default', 'project': 'learn_kylin', 'cube': 'kylin_sales_cube'}, tags=['example'], ) -def gen_build_time(**kwargs): +@dag.task +def gen_build_time(): """ - Gen build time and push to xcom - :param kwargs: - :return: + Gen build time and push to XCom (with key of "return_value") + :return: A dict with build time values. """ - ti = kwargs['ti'] - ti.xcom_push(key='date_start', value='1325347200000') - ti.xcom_push(key='date_end', value='1325433600000') + return {'date_start': '1325347200000', 'date_end': '1325433600000'} -gen_build_time_task = PythonOperator(python_callable=gen_build_time, task_id='gen_build_time', dag=dag) +gen_build_time = gen_build_time() +gen_build_time_output_date_start = gen_build_time['date_start'] +gen_build_time_output_date_end = gen_build_time['date_end'] build_task1 = KylinCubeOperator( task_id="kylin_build_1", - kylin_conn_id='kylin_default', - project='learn_kylin', - cube='kylin_sales_cube', command='build', - start_time=gen_build_time_task.output['date_start'], - end_time=gen_build_time_task.output['date_end'], + start_time=gen_build_time_output_date_start, + end_time=gen_build_time_output_date_end, is_track_job=True, dag=dag, ) build_task2 = KylinCubeOperator( task_id="kylin_build_2", - kylin_conn_id='kylin_default', - project='learn_kylin', - cube='kylin_sales_cube', command='build', - start_time='1325433600000', + start_time=gen_build_time_output_date_end, end_time='1325520000000', is_track_job=True, dag=dag, @@ -72,23 +66,17 @@ build_task2 = KylinCubeOperator( refresh_task1 = KylinCubeOperator( task_id="kylin_refresh_1", - kylin_conn_id='kylin_default', - project='learn_kylin', - cube='kylin_sales_cube', command='refresh', - start_time='1325347200000', - end_time='1325433600000', + start_time=gen_build_time_output_date_start, + end_time=gen_build_time_output_date_end, is_track_job=True, dag=dag, ) merge_task = KylinCubeOperator( task_id="kylin_merge", - kylin_conn_id='kylin_default', - project='learn_kylin', - cube='kylin_sales_cube', command='merge', - start_time='1325347200000', + start_time=gen_build_time_output_date_start, end_time='1325520000000', is_track_job=True, dag=dag, @@ -96,35 +84,29 @@ merge_task = KylinCubeOperator( disable_task = KylinCubeOperator( task_id="kylin_disable", - kylin_conn_id='kylin_default', - project='learn_kylin', - cube='kylin_sales_cube', command='disable', dag=dag, ) purge_task = KylinCubeOperator( task_id="kylin_purge", - kylin_conn_id='kylin_default', - project='learn_kylin', - cube='kylin_sales_cube', command='purge', dag=dag, ) build_task3 = KylinCubeOperator( task_id="kylin_build_3", - kylin_conn_id='kylin_default', - project='learn_kylin', - cube='kylin_sales_cube', command='build', - start_time='1325433600000', - end_time='1325520000000', + start_time=gen_build_time_output_date_end, + end_time='1328730000000', dag=dag, ) -build_task1 >> build_task2 >> refresh_task1 >> merge_task -merge_task >> disable_task >> purge_task >> build_task3 +build_task1 >> build_task2 >> refresh_task1 >> merge_task >> disable_task >> purge_task >> build_task3 # Task dependency created via `XComArgs`: -# gen_build_time_task >> build_task1 +# gen_build_time >> build_task1 +# gen_build_time >> build_task2 +# gen_build_time >> refresh_task1 +# gen_build_time >> merge_task +# gen_build_time >> build_task3 diff --git a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py b/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py index 118abfb..334aac5 100644 --- a/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py +++ b/airflow/providers/google/cloud/example_dags/example_s3_to_gcs.py @@ -18,7 +18,7 @@ import os from airflow import models -from airflow.operators.python import PythonOperator +from airflow.decorators import task from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.operators.s3_bucket import S3CreateBucketOperator, S3DeleteBucketOperator from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator @@ -32,6 +32,7 @@ UPLOAD_FILE = '/tmp/example-file.txt' PREFIX = 'TESTS' +@task(task_id='upload_file_to_s3') def upload_file(): """A callable to upload file to AWS bucket""" s3_hook = S3Hook() @@ -48,8 +49,6 @@ with models.DAG( task_id="create_s3_bucket", bucket_name=S3BUCKET_NAME, region_name='us-east-1' ) - upload_to_s3 = PythonOperator(task_id='upload_file_to_s3', python_callable=upload_file) - create_gcs_bucket = GCSCreateBucketOperator( task_id="create_bucket", bucket_name=GCS_BUCKET, @@ -69,7 +68,7 @@ with models.DAG( ( create_s3_bucket - >> upload_to_s3 + >> upload_file() >> create_gcs_bucket >> transfer_to_gcs >> delete_s3_bucket diff --git a/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py b/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py index 66f4803..ce2c8ac 100644 --- a/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py +++ b/airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py @@ -15,22 +15,21 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from datetime import datetime, timedelta +from datetime import datetime from requests import Request from airflow import DAG -from airflow.operators.python import PythonOperator +from airflow.decorators import task from airflow.providers.jenkins.hooks.jenkins import JenkinsHook from airflow.providers.jenkins.operators.jenkins_job_trigger import JenkinsJobTriggerOperator +JENKINS_CONNECTION_ID = "your_jenkins_connection" + with DAG( "test_jenkins", default_args={ - "owner": "airflow", "retries": 1, - "retry_delay": timedelta(minutes=5), - "depends_on_past": False, "concurrency": 8, "max_active_runs": 8, }, @@ -42,16 +41,17 @@ with DAG( job_name="generate-merlin-config", parameters={"first_parameter": "a_value", "second_parameter": "18"}, # parameters="resources/parameter.json", You can also pass a path to a json file containing your param - jenkins_connection_id="your_jenkins_connection", # T he connection must be configured first + jenkins_connection_id=JENKINS_CONNECTION_ID, # The connection must be configured first ) + @task def grab_artifact_from_jenkins(url): """ Grab an artifact from the previous job The python-jenkins library doesn't expose a method for that But it's totally possible to build manually the request for that """ - hook = JenkinsHook("your_jenkins_connection") + hook = JenkinsHook(JENKINS_CONNECTION_ID) jenkins_server = hook.get_jenkins_server() # The JenkinsJobTriggerOperator store the job url in the xcom variable corresponding to the task # You can then use it to access things or to get the job number @@ -61,11 +61,7 @@ with DAG( response = jenkins_server.jenkins_open(request) return response # We store the artifact content in a xcom variable for later use - artifact_grabber = PythonOperator( - task_id='artifact_grabber', - python_callable=grab_artifact_from_jenkins, - op_kwargs=dict(url=job_trigger.output), - ) + grab_artifact_from_jenkins(job_trigger.output) # Task dependency created via `XComArgs`: - # job_trigger >> artifact_grabber + # job_trigger >> grab_artifact_from_jenkins() diff --git a/airflow/providers/microsoft/azure/example_dags/example_fileshare.py b/airflow/providers/microsoft/azure/example_dags/example_fileshare.py index e3263d6..9fbb0a9 100644 --- a/airflow/providers/microsoft/azure/example_dags/example_fileshare.py +++ b/airflow/providers/microsoft/azure/example_dags/example_fileshare.py @@ -15,8 +15,8 @@ # specific language governing permissions and limitations # under the License. +from airflow.decorators import task from airflow.models import DAG -from airflow.operators.python import PythonOperator from airflow.providers.microsoft.azure.hooks.azure_fileshare import AzureFileShareHook from airflow.utils.dates import days_ago @@ -24,6 +24,7 @@ NAME = 'myfileshare' DIRECTORY = "mydirectory" +@task def create_fileshare(): """Create a fileshare with directory""" hook = AzureFileShareHook() @@ -34,6 +35,7 @@ def create_fileshare(): raise Exception +@task def delete_fileshare(): """Delete a fileshare""" hook = AzureFileShareHook() @@ -41,7 +43,4 @@ def delete_fileshare(): with DAG("example_fileshare", schedule_interval="@once", start_date=days_ago(2)) as dag: - create = PythonOperator(task_id="create-share", python_callable=create_fileshare) - delete = PythonOperator(task_id="delete-share", python_callable=delete_fileshare) - - create >> delete + create_fileshare() >> delete_fileshare() diff --git a/airflow/providers/papermill/example_dags/example_papermill.py b/airflow/providers/papermill/example_dags/example_papermill.py index 3ade5e2..1adb0e8 100644 --- a/airflow/providers/papermill/example_dags/example_papermill.py +++ b/airflow/providers/papermill/example_dags/example_papermill.py @@ -26,8 +26,8 @@ from datetime import timedelta import scrapbook as sb from airflow import DAG +from airflow.decorators import task from airflow.lineage import AUTO -from airflow.operators.python import PythonOperator from airflow.providers.papermill.operators.papermill import PapermillOperator from airflow.utils.dates import days_ago @@ -48,6 +48,7 @@ with DAG( # [END howto_operator_papermill] +@task def check_notebook(inlets, execution_date): """ Verify the message in the notebook @@ -76,6 +77,4 @@ with DAG( parameters={"msgs": "Ran from Airflow at {{ execution_date }}!"}, ) - check_output = PythonOperator(task_id='check_out', python_callable=check_notebook, inlets=AUTO) - - check_output.set_upstream(run_this) + run_this >> check_notebook(inlets=AUTO, execution_date="{{ execution_date }}") diff --git a/airflow/providers/qubole/example_dags/example_qubole.py b/airflow/providers/qubole/example_dags/example_qubole.py index 9162753..3b3fc86 100644 --- a/airflow/providers/qubole/example_dags/example_qubole.py +++ b/airflow/providers/qubole/example_dags/example_qubole.py @@ -21,23 +21,15 @@ import random import textwrap from airflow import DAG +from airflow.decorators import task from airflow.operators.dummy import DummyOperator -from airflow.operators.python import BranchPythonOperator, PythonOperator +from airflow.operators.python import BranchPythonOperator from airflow.providers.qubole.operators.qubole import QuboleOperator from airflow.providers.qubole.sensors.qubole import QuboleFileSensor, QubolePartitionSensor from airflow.utils.dates import days_ago -default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email': ['airf...@example.com'], - 'email_on_failure': False, - 'email_on_retry': False, -} - with DAG( dag_id='example_qubole_operator', - default_args=default_args, schedule_interval=None, start_date=days_ago(2), tags=['example'], @@ -55,12 +47,17 @@ with DAG( """ ) - def compare_result_fn(ti): + @task(trigger_rule='all_done') + def compare_result(hive_show_table, hive_s3_location, ti=None): """ Compares the results of two QuboleOperator tasks. - :param kwargs: The context of the executed task. - :type kwargs: dict + :param hive_show_table: The "hive_show_table" task. + :type hive_show_table: QuboleOperator + :param hive_s3_location: The "hive_s3_location" task. + :type hive_s3_location: QuboleOperator + :param ti: The TaskInstance object. + :type ti: airflow.models.TaskInstance :return: True if the files are the same, False otherwise. :rtype: bool """ @@ -93,20 +90,13 @@ with DAG( tags=['tag1', 'tag2'], # If the script at s3 location has any qubole specific macros to be replaced # macros='[{"date": "{{ ds }}"}, {"name" : "abc"}]', - trigger_rule="all_done", - ) - - compare_result = PythonOperator( - task_id='compare_result', python_callable=compare_result_fn, trigger_rule="all_done" ) - compare_result << [hive_show_table, hive_s3_location] - options = ['hadoop_jar_cmd', 'presto_cmd', 'db_query', 'spark_cmd'] branching = BranchPythonOperator(task_id='branching', python_callable=lambda: random.choice(options)) - branching << compare_result + [hive_show_table, hive_s3_location] >> compare_result(hive_s3_location, hive_show_table) >> branching join = DummyOperator(task_id='join', trigger_rule='one_success') @@ -131,11 +121,9 @@ with DAG( command_type="pigcmd", script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig", parameters="key1=value1 key2=value2", - trigger_rule="all_done", ) - pig_cmd << hadoop_jar_cmd << branching - pig_cmd >> join + branching >> hadoop_jar_cmd >> pig_cmd >> join presto_cmd = QuboleOperator(task_id='presto_cmd', command_type='prestocmd', query='show tables') @@ -144,11 +132,9 @@ with DAG( command_type="shellcmd", script_location="s3://public-qubole/qbol-library/scripts/shellx.sh", parameters="param1 param2", - trigger_rule="all_done", ) - shell_cmd << presto_cmd << branching - shell_cmd >> join + branching >> presto_cmd >> shell_cmd >> join db_query = QuboleOperator( task_id='db_query', command_type='dbtapquerycmd', query='show tables', db_tap_id=2064 @@ -162,11 +148,9 @@ with DAG( db_table='exported_airline_origin_destination', partition_spec='dt=20110104-02', dbtap_id=2064, - trigger_rule="all_done", ) - db_export << db_query << branching - db_export >> join + branching >> db_query >> db_export >> join db_import = QuboleOperator( task_id='db_import', @@ -177,7 +161,6 @@ with DAG( where_clause='id < 10', parallelism=2, dbtap_id=2064, - trigger_rule="all_done", ) prog = ''' @@ -213,12 +196,10 @@ with DAG( tags='airflow_example_run', ) - spark_cmd << db_import << branching - spark_cmd >> join + branching >> db_import >> spark_cmd >> join with DAG( dag_id='example_qubole_sensor', - default_args=default_args, schedule_interval=None, start_date=days_ago(2), doc_md=__doc__, diff --git a/airflow/providers/sqlite/example_dags/create_table.sql b/airflow/providers/sqlite/example_dags/create_table.sql new file mode 100644 index 0000000..0468338 --- /dev/null +++ b/airflow/providers/sqlite/example_dags/create_table.sql @@ -0,0 +1,24 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +CREATE TABLE Items ( + item_id INT PRIMARY KEY, + name TEXT, + description TEXT +) diff --git a/airflow/providers/sqlite/example_dags/example_sqlite.py b/airflow/providers/sqlite/example_dags/example_sqlite.py index 2ff5114..5799871 100644 --- a/airflow/providers/sqlite/example_dags/example_sqlite.py +++ b/airflow/providers/sqlite/example_dags/example_sqlite.py @@ -24,7 +24,6 @@ The second task is similar but instead calls the SQL command from an external fi """ from airflow import DAG -from airflow.operators.python import PythonOperator from airflow.providers.sqlite.hooks.sqlite import SqliteHook from airflow.providers.sqlite.operators.sqlite import SqliteOperator from airflow.utils.dates import days_ago @@ -41,12 +40,11 @@ dag = DAG( # Example of creating a task that calls a common CREATE TABLE sql command. create_table_sqlite_task = SqliteOperator( task_id='create_table_sqlite', - sqlite_conn_id='sqlite_conn_id', sql=r""" - CREATE TABLE table_name ( - column_1 string, - column_2 string, - column_3 string + CREATE TABLE Customers ( + customer_id INT PRIMARY KEY, + first_name TEXT, + last_name TEXT ); """, dag=dag, @@ -55,37 +53,32 @@ create_table_sqlite_task = SqliteOperator( # [END howto_operator_sqlite] +@dag.task(task_id="insert_sqlite_task") def insert_sqlite_hook(): sqlite_hook = SqliteHook("sqlite_default") - sqlite_hook.get_conn() rows = [('James', '11'), ('James', '22'), ('James', '33')] target_fields = ['first_name', 'last_name'] - sqlite_hook.insert_rows(table='Customer', rows=rows, target_fields=target_fields) + sqlite_hook.insert_rows(table='Customers', rows=rows, target_fields=target_fields) +@dag.task(task_id="replace_sqlite_task") def replace_sqlite_hook(): sqlite_hook = SqliteHook("sqlite_default") - sqlite_hook.get_conn() rows = [('James', '11'), ('James', '22'), ('James', '33')] target_fields = ['first_name', 'last_name'] - sqlite_hook.insert_rows(table='Customer', rows=rows, target_fields=target_fields, replace=True) + sqlite_hook.insert_rows(table='Customers', rows=rows, target_fields=target_fields, replace=True) -insert_sqlite_task = PythonOperator(task_id="insert_sqlite_task", python_callable=insert_sqlite_hook) -replace_sqlite_task = PythonOperator(task_id="replace_sqlite_task", python_callable=replace_sqlite_hook) - # [START howto_operator_sqlite_external_file] # Example of creating a task that calls an sql command from an external file. external_create_table_sqlite_task = SqliteOperator( task_id='create_table_sqlite_external_file', - sqlite_conn_id='sqlite_conn_id', - sql='/scripts/create_table.sql', - dag=dag, + sql='create_table.sql', ) # [END howto_operator_sqlite_external_file] -create_table_sqlite_task >> external_create_table_sqlite_task >> insert_sqlite_task >> replace_sqlite_task +create_table_sqlite_task >> external_create_table_sqlite_task >> insert_sqlite_hook() >> replace_sqlite_hook()