turbaszek commented on a change in pull request #18278:
URL: https://github.com/apache/airflow/pull/18278#discussion_r709593587
##########
File path: airflow/providers/apache/hive/example_dags/example_twitter_dag.py
##########
@@ -31,92 +31,61 @@
from datetime import date, timedelta
from airflow import DAG
+from airflow.decorators import task
from airflow.operators.bash import BashOperator
-from airflow.operators.python import PythonOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.utils.dates import days_ago
-#
--------------------------------------------------------------------------------
-# Create a few placeholder scripts. In practice these would be different python
-# script files, which are imported in this section with absolute or relative
imports
-#
--------------------------------------------------------------------------------
-
-def fetchtweets():
+@task
+def fetch_tweets():
"""
- This is a placeholder for fetchtweets.
+ This task should call Twitter API and retrieve tweets from yesterday from
and to for the four twitter
+ users (Twitter_A,..,Twitter_D) There should be eight csv output files
generated by this task and naming
+ convention is direction(from or to)_twitterHandle_date.csv
"""
-def cleantweets():
+@task
+def clean_tweets():
"""
- This is a placeholder for cleantweets.
+ This is a placeholder to clean the eight files. In this step you can get
rid of or cherry pick columns
+ and different parts of the text.
"""
-def analyzetweets():
+@task
+def analyze_tweets():
"""
- This is a placeholder for analyzetweets.
+ This is a placeholder to analyze the twitter data. Could simply be a
sentiment analysis through algorithms
+ like bag of words or something more complicated. You can also take a look
at Web Services to do such
+ tasks.
"""
-def transfertodb():
+@task
+def transfer_to_db():
"""
- This is a placeholder for transfertodb.
+ This is a placeholder to extract summary from Hive data and store it to
MySQL.
"""
with DAG(
dag_id='example_twitter_dag',
default_args={
'owner': 'Ekhtiar',
- 'depends_on_past': False,
- 'email': ['[email protected]'],
- 'email_on_failure': False,
- 'email_on_retry': False,
'retries': 1,
- 'retry_delay': timedelta(minutes=5),
},
schedule_interval="@daily",
start_date=days_ago(5),
tags=['example'],
) as dag:
+ clean_tweets = clean_tweets()
+ analyze_tweets = analyze_tweets()
+ hive_to_mysql = transfer_to_db()
- #
--------------------------------------------------------------------------------
- # This task should call Twitter API and retrieve tweets from yesterday
from and to
- # for the four twitter users (Twitter_A,..,Twitter_D) There should be
eight csv
- # output files generated by this task and naming convention
- # is direction(from or to)_twitterHandle_date.csv
- #
--------------------------------------------------------------------------------
-
- fetch_tweets = PythonOperator(task_id='fetch_tweets',
python_callable=fetchtweets)
-
- #
--------------------------------------------------------------------------------
- # Clean the eight files. In this step you can get rid of or cherry pick
columns
- # and different parts of the text
- #
--------------------------------------------------------------------------------
-
- clean_tweets = PythonOperator(task_id='clean_tweets',
python_callable=cleantweets)
-
- clean_tweets << fetch_tweets
-
- #
--------------------------------------------------------------------------------
- # In this section you can use a script to analyze the twitter data. Could
simply
- # be a sentiment analysis through algorithms like bag of words or
something more
- # complicated. You can also take a look at Web Services to do such tasks
- #
--------------------------------------------------------------------------------
-
- analyze_tweets = PythonOperator(task_id='analyze_tweets',
python_callable=analyzetweets)
-
- analyze_tweets << clean_tweets
-
- #
--------------------------------------------------------------------------------
- # Although this is the last task, we need to declare it before the next
tasks as we
- # will use set_downstream This task will extract summary from Hive data
and store
- # it to MySQL
- #
--------------------------------------------------------------------------------
-
- hive_to_mysql = PythonOperator(task_id='hive_to_mysql',
python_callable=transfertodb)
+ fetch_tweets() >> clean_tweets
Review comment:
Let's initialize this fetch task before chaining as we do with the
others. It would be more consistent
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]