This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 2cc9ed00b22245fcc9c9feb4dac36817a1447ff5 Author: Kian Yang Lee <[email protected]> AuthorDate: Wed Oct 27 02:37:01 2021 +0800 Doc: Improve tutorial documentation and code (#19186) 1. Added instructions on adding postgres connection. 2. Modified proper SQL syntax. 3. Remove redundant lines when writing to CSV. 4. Added QUOTE argument for copy_expert (cherry picked from commit f83099cd4c2eaecfd363fce4562aa19e975c146c) --- docs/apache-airflow/tutorial.rst | 99 ++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 44 deletions(-) diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst index d9587bc..acb7e84 100644 --- a/docs/apache-airflow/tutorial.rst +++ b/docs/apache-airflow/tutorial.rst @@ -377,73 +377,86 @@ We need to have docker and postgres installed. We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_ Follow the instructions properly to set up Airflow. -Create a Employee table in postgres using this +Create a Employee table in postgres using this: .. code-block:: sql - create table "Employees" + CREATE TABLE "Employees" ( - "Serial Number" numeric not null - constraint employees_pk - primary key, - "Company Name" text, - "Employee Markme" text, - "Description" text, - "Leave" integer + "Serial Number" NUMERIC PRIMARY KEY, + "Company Name" TEXT, + "Employee Markme" TEXT, + "Description" TEXT, + "Leave" INTEGER ); - create table "Employees_temp" + CREATE TABLE "Employees_temp" ( - "Serial Number" numeric not null - constraint employees_temp_pk - primary key, - "Company Name" text, - "Employee Markme" text, - "Description" text, - "Leave" integer + "Serial Number" NUMERIC PRIMARY KEY, + "Company Name" TEXT, + "Employee Markme" TEXT, + "Description" TEXT, + "Leave" INTEGER ); +We also need to add a connection to postgres. Go to the UI and click "Admin" >> "Connections". Specify the following for each field: + +- Conn id: LOCAL +- Conn Type: postgres +- Host: postgres +- Schema: <DATABASE_NAME> +- Login: airflow +- Password: airflow +- Port: 5432 + +After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection and we are now ready write the DAG. Let's break this down into 2 steps: get data & merge data: .. code-block:: python + from airflow.decorators import dag, task + from airflow.hooks.postgres import PostgresHook + from datetime import datetime, timedelta + import requests + + @task def get_data(): - data_path = "/usr/local/airflow/dags/files/employees.csv" + # NOTE: configure this as appropriate for your airflow environment + data_path = "/opt/airflow/dags/files/employees.csv" url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv" response = requests.request("GET", url) with open(data_path, "w") as file: - for row in response.text.split("\n"): - if row: - file.write(row + "\n") + file.write(response.text) postgres_hook = PostgresHook(postgres_conn_id="LOCAL") conn = postgres_hook.get_conn() cur = conn.cursor() with open(data_path, "r") as file: cur.copy_expert( - "COPY \"Employees_temp\" FROM stdin WITH CSV HEADER DELIMITER AS ','", file + "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", + file, ) conn.commit() -Here we are passing a ``GET`` request to get the data from the URL and save it in ``employees.csv`` file on our Airflow instance and we are dumping the file into a temporary table before merging the data to the final employees table +Here we are passing a ``GET`` request to get the data from the URL and save it in ``employees.csv`` file on our Airflow instance and we are dumping the file into a temporary table before merging the data to the final employees table. .. code-block:: python @task def merge_data(): query = """ - delete - from "Employees" e using "Employees_temp" et - where e."Serial Number" = et."Serial Number"; + DELETE FROM "Employees" e + USING "Employees_temp" et + WHERE e."Serial Number" = et."Serial Number"; - insert into "Employees" - select * - from "Employees_temp"; + INSERT INTO "Employees" + SELECT * + FROM "Employees_temp"; """ try: postgres_hook = PostgresHook(postgres_conn_id="LOCAL") @@ -455,7 +468,7 @@ Here we are passing a ``GET`` request to get the data from the URL and save it i except Exception as e: return 1 -Here we are first looking for duplicate values and removing them before we insert new values in our final table +Here we are first looking for duplicate values and removing them before we insert new values in our final table. Lets look at our DAG: @@ -478,23 +491,21 @@ Lets look at our DAG: @task def get_data(): # NOTE: configure this as appropriate for your airflow environment - data_path = "/usr/local/airflow/dags/files/employees.csv" + data_path = "/opt/airflow/dags/files/employees.csv" url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv" response = requests.request("GET", url) with open(data_path, "w") as file: - for row in response.text.split("\n"): - if row: - file.write(row + "\n") + file.write(response.text) postgres_hook = PostgresHook(postgres_conn_id="LOCAL") conn = postgres_hook.get_conn() cur = conn.cursor() with open(data_path, "r") as file: cur.copy_expert( - "COPY \"Employees_temp\" FROM stdin WITH CSV HEADER DELIMITER AS ','", + "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", file, ) conn.commit() @@ -502,13 +513,13 @@ Lets look at our DAG: @task def merge_data(): query = """ - delete - from "Employees" e using "Employees_temp" et - where e."Serial Number" = et."Serial Number"; + DELETE FROM "Employees" e + USING "Employees_temp" et + WHERE e."Serial Number" = et."Serial Number"; - insert into "Employees" - select * - from "Employees_temp"; + INSERT INTO "Employees" + SELECT * + FROM "Employees_temp"; """ try: postgres_hook = PostgresHook(postgres_conn_id="LOCAL") @@ -526,14 +537,14 @@ Lets look at our DAG: dag = Etl() This dag runs daily at 00:00. -Add this python file to airflow/dags folder (e.g. ``dags/etl.py``) and go back to the main folder and run +Add this python file to airflow/dags folder (e.g. ``dags/etl.py``) and go back to the main folder and run: .. code-block:: bash docker-compose up airflow-init docker-compose up -Go to your browser and go to the site http://localhost:8080/home and trigger your DAG Airflow Example +Go to your browser and go to the site http://localhost:8080/home and trigger your DAG Airflow Example: .. image:: img/new_tutorial-1.png @@ -541,7 +552,7 @@ Go to your browser and go to the site http://localhost:8080/home and trigger you .. image:: img/new_tutorial-2.png The DAG ran successfully as we can see the green boxes. If there had been an error the boxes would be red. -Before the DAG run my local table had 10 rows after the DAG run it had approx 100 rows +Before the DAG run my local table had 10 rows after the DAG run it had approx 100 rows. What's Next?
