potiuk commented on code in PR #25890:
URL: https://github.com/apache/airflow/pull/25890#discussion_r954799592


##########
docs/apache-airflow/tutorial/pipeline.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+
+Building a Running Pipeline
+===========================
+
+Lets look at another example; we need to get some data from a file which is 
hosted online and insert it into our local database. We also need to look at 
removing duplicate rows while inserting.
+
+Initial setup
+-------------
+
+We need to have Docker installed as we will be using the `quick-start 
docker-compose installation 
<https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html>`_ for 
this example.
+The steps below should be sufficient, but see the quick-start documentation 
for full instructions.
+
+.. code-block:: bash
+
+  # Download the docker-compose.yaml file
+  curl -LfO 
'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
+
+  # Make expected directories and set an expected environment variable
+  mkdir -p ./dags ./logs ./plugins
+  echo -e "AIRFLOW_UID=$(id -u)" > .env
+
+  # Initialize the database
+  docker-compose up airflow-init
+
+  # Start up all services
+  docker-compose up
+
+After all services have started up, the web UI will be available at: 
``http://localhost:8080``. The default account has the username ``airflow`` and 
the password ``airflow``.
+
+We will also need to create a `connection 
<https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html>`_
 to the postgres db. To create one via the web UI, from the "Admin" menu, 
select "Connections", then click the Plus sign to "Add a new record" to the 
list of connections.
+
+Fill in the fields as shown below. Note the Connection Id value, which we'll 
pass as a parameter for the ``postgres_conn_id`` kwarg.
+
+- Connection Id: tutorial_pg_conn
+- Connection Type: postgres
+- Host: postgres
+- Schema: airflow
+- Login: airflow
+- Password: airflow
+- Port: 5432
+
+Test your connection and if the test is successful, save your connection.
+
+Table Creation Tasks
+--------------------
+
+We can use the `PostgresOperator 
<https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html#creating-a-postgres-database-table>`_
 to define tasks that create tables in our postgres db.
+
+We'll create one table to facilitate data cleaning steps (``employees_temp``) 
and another table to store our cleaned data (``employees``).
+
+.. code-block:: python
+
+  from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          CREATE TABLE IF NOT EXISTS employees (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+  create_employees_temp_table = PostgresOperator(
+      task_id="create_employees_temp_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          DROP TABLE IF EXISTS employees_temp;
+          CREATE TABLE employees_temp (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+Optional: Using SQL From Files
+------------------------------
+
+If you want to abstract these sql statements out of your DAG, you can move the 
statements sql files somewhere within the ``dags/`` directory and pass the sql 
file_path (relative to ``dags/``) to the ``sql`` kwarg. For ``employees`` for 
example, create a ``sql`` directory in ``dags/``, put ``employees`` DDL in 
``dags/sql/employees_schema.sql``, and modify the PostgresOperator() to:
+
+.. code-block:: python
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="sql/employees_schema.sql",
+  )
+
+and repeat for the ``employees_temp`` table.
+
+Data Retrieval Task
+-------------------
+
+Here we retrieve data, save it to a file on our Airflow instance, and load the 
data from that file into an intermediate table where we can execute data 
cleaning steps.
+
+.. code-block:: python
+
+  import os
+  import requests
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def get_data():
+      # NOTE: configure this as appropriate for your airflow environment
+      data_path = "/opt/airflow/dags/files/employees.csv"
+      os.makedirs(os.path.dirname(data_path), exist_ok=True)
+
+      url = 
"https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv";
+
+      response = requests.request("GET", url)
+
+      with open(data_path, "w") as file:
+          file.write(response.text)
+
+      postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+      conn = postgres_hook.get_conn()
+      cur = conn.cursor()
+      with open(data_path, "r") as file:
+          cur.copy_expert(
+              "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' 
QUOTE '\"'",
+              file,
+          )
+      conn.commit()
+
+Data Merge Task
+---------------
+
+Here we select completely unique records from the retrieved data, then we 
check to see if any employee ``Serial Numbers`` are already in the database (if 
they are, we update those records with the new data).
+
+.. code-block:: python
+
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def merge_data():
+      query = """
+          INSERT INTO employees
+          SELECT *
+          FROM (
+              SELECT DISTINCT *
+              FROM employees_temp
+          )
+          ON CONFLICT ("Serial Number") DO UPDATE
+          SET "Serial Number" = excluded."Serial Number";
+      """
+      try:
+          postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+          conn = postgres_hook.get_conn()
+          cur = conn.cursor()
+          cur.execute(query)
+          conn.commit()
+          return 0
+      except Exception as e:
+          return 1
+
+
+
+Completing our DAG
+------------------
+
+We've developed our tasks, now we need to wrap them in a DAG, which enables us 
to define when and how tasks should run, and state any dependencies that tasks 
have on other tasks. The DAG below is configured to:
+
+* run every day at midnight starting on Jan 1, 2021,
+* only run once in the event that days are missed, and
+* timeout after 60 minutes
+
+And from the last line in the definition of the ``Etl`` DAG, we see:

Review Comment:
   I think we should get rid of ETL here (and also in the examples - including 
the TaskFlow tutorial. 
   
   This is something that always bothered me, even if you do ETL with Airflow, 
you usually do it completely differently than described in this tutorial. 
   
   I do not think ETL is as "catchy" thing that was when the documentation was 
initially created, and I see no harm with completely getting rid of it here. We 
should stop pretending this is somewhat "real" ETL example that people should 
follow - because it is as far from the real ETL as it can be.
   



##########
docs/apache-airflow/tutorial/pipeline.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+
+Building a Running Pipeline
+===========================
+
+Lets look at another example; we need to get some data from a file which is 
hosted online and insert it into our local database. We also need to look at 
removing duplicate rows while inserting.
+
+Initial setup
+-------------
+
+We need to have Docker installed as we will be using the `quick-start 
docker-compose installation 
<https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html>`_ for 
this example.
+The steps below should be sufficient, but see the quick-start documentation 
for full instructions.
+
+.. code-block:: bash
+
+  # Download the docker-compose.yaml file
+  curl -LfO 
'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
+
+  # Make expected directories and set an expected environment variable
+  mkdir -p ./dags ./logs ./plugins
+  echo -e "AIRFLOW_UID=$(id -u)" > .env
+
+  # Initialize the database
+  docker-compose up airflow-init
+
+  # Start up all services
+  docker-compose up
+
+After all services have started up, the web UI will be available at: 
``http://localhost:8080``. The default account has the username ``airflow`` and 
the password ``airflow``.
+
+We will also need to create a `connection 
<https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html>`_
 to the postgres db. To create one via the web UI, from the "Admin" menu, 
select "Connections", then click the Plus sign to "Add a new record" to the 
list of connections.
+
+Fill in the fields as shown below. Note the Connection Id value, which we'll 
pass as a parameter for the ``postgres_conn_id`` kwarg.
+
+- Connection Id: tutorial_pg_conn
+- Connection Type: postgres
+- Host: postgres
+- Schema: airflow
+- Login: airflow
+- Password: airflow
+- Port: 5432
+
+Test your connection and if the test is successful, save your connection.
+
+Table Creation Tasks
+--------------------
+
+We can use the `PostgresOperator 
<https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html#creating-a-postgres-database-table>`_
 to define tasks that create tables in our postgres db.
+
+We'll create one table to facilitate data cleaning steps (``employees_temp``) 
and another table to store our cleaned data (``employees``).
+
+.. code-block:: python
+
+  from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          CREATE TABLE IF NOT EXISTS employees (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+  create_employees_temp_table = PostgresOperator(
+      task_id="create_employees_temp_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          DROP TABLE IF EXISTS employees_temp;
+          CREATE TABLE employees_temp (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+Optional: Using SQL From Files
+------------------------------
+
+If you want to abstract these sql statements out of your DAG, you can move the 
statements sql files somewhere within the ``dags/`` directory and pass the sql 
file_path (relative to ``dags/``) to the ``sql`` kwarg. For ``employees`` for 
example, create a ``sql`` directory in ``dags/``, put ``employees`` DDL in 
``dags/sql/employees_schema.sql``, and modify the PostgresOperator() to:
+
+.. code-block:: python
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="sql/employees_schema.sql",
+  )
+
+and repeat for the ``employees_temp`` table.
+
+Data Retrieval Task
+-------------------
+
+Here we retrieve data, save it to a file on our Airflow instance, and load the 
data from that file into an intermediate table where we can execute data 
cleaning steps.
+
+.. code-block:: python
+
+  import os
+  import requests
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def get_data():
+      # NOTE: configure this as appropriate for your airflow environment
+      data_path = "/opt/airflow/dags/files/employees.csv"
+      os.makedirs(os.path.dirname(data_path), exist_ok=True)
+
+      url = 
"https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv";
+
+      response = requests.request("GET", url)
+
+      with open(data_path, "w") as file:
+          file.write(response.text)
+
+      postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+      conn = postgres_hook.get_conn()
+      cur = conn.cursor()
+      with open(data_path, "r") as file:
+          cur.copy_expert(
+              "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' 
QUOTE '\"'",
+              file,
+          )
+      conn.commit()
+
+Data Merge Task
+---------------
+
+Here we select completely unique records from the retrieved data, then we 
check to see if any employee ``Serial Numbers`` are already in the database (if 
they are, we update those records with the new data).
+
+.. code-block:: python
+
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def merge_data():
+      query = """
+          INSERT INTO employees
+          SELECT *
+          FROM (
+              SELECT DISTINCT *
+              FROM employees_temp
+          )
+          ON CONFLICT ("Serial Number") DO UPDATE
+          SET "Serial Number" = excluded."Serial Number";
+      """
+      try:
+          postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+          conn = postgres_hook.get_conn()
+          cur = conn.cursor()
+          cur.execute(query)
+          conn.commit()
+          return 0
+      except Exception as e:
+          return 1
+
+
+
+Completing our DAG
+------------------
+
+We've developed our tasks, now we need to wrap them in a DAG, which enables us 
to define when and how tasks should run, and state any dependencies that tasks 
have on other tasks. The DAG below is configured to:
+
+* run every day at midnight starting on Jan 1, 2021,
+* only run once in the event that days are missed, and
+* timeout after 60 minutes
+
+And from the last line in the definition of the ``Etl`` DAG, we see:
+
+.. code-block:: python
+
+      [create_employees_table, create_employees_temp_table] >> get_data() >> 
merge_data()
+
+* the ``merge_data()`` task depends on the ``get_data()`` task,
+* the ``get_data()`` depends on both the ``create_employees_table`` and 
``create_employees_temp_table`` tasks, and
+* the ``create_employees_table`` and ``create_employees_temp_table`` tasks can 
run independently.
+
+Putting all of the pieces together, we have our completed DAG.
+
+.. code-block:: python
+
+  import datetime
+  import pendulum
+  import os
+
+  import requests
+  from airflow.decorators import dag, task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+  from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+
+  @dag(
+      schedule="0 0 * * *",
+      start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+      catchup=False,
+      dagrun_timeout=datetime.timedelta(minutes=60),
+  )
+  def Etl():
+      create_employees_table = PostgresOperator(
+          task_id="create_employees_table",
+          postgres_conn_id="tutorial_pg_conn",
+          sql="""
+              CREATE TABLE IF NOT EXISTS employees (
+                  "Serial Number" NUMERIC PRIMARY KEY,
+                  "Company Name" TEXT,
+                  "Employee Markme" TEXT,
+                  "Description" TEXT,
+                  "Leave" INTEGER
+              );""",
+      )
+
+      create_employees_temp_table = PostgresOperator(
+          task_id="create_employees_temp_table",
+          postgres_conn_id="tutorial_pg_conn",
+          sql="""
+              DROP TABLE IF EXISTS employees_temp;
+              CREATE TABLE employees_temp (
+                  "Serial Number" NUMERIC PRIMARY KEY,
+                  "Company Name" TEXT,
+                  "Employee Markme" TEXT,
+                  "Description" TEXT,
+                  "Leave" INTEGER
+              );""",
+      )
+
+      @task
+      def get_data():
+          # NOTE: configure this as appropriate for your airflow environment
+          data_path = "/opt/airflow/dags/files/employees.csv"
+          os.makedirs(os.path.dirname(data_path), exist_ok=True)
+
+          url = 
"https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv";
+
+          response = requests.request("GET", url)
+
+          with open(data_path, "w") as file:
+              file.write(response.text)
+
+          postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+          conn = postgres_hook.get_conn()
+          cur = conn.cursor()
+          with open(data_path, "r") as file:
+              cur.copy_expert(
+                  "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS 
',' QUOTE '\"'",
+                  file,
+              )
+          conn.commit()
+
+      @task
+      def merge_data():
+          query = """
+              INSERT INTO employees
+              SELECT *
+              FROM (
+                  SELECT DISTINCT *
+                  FROM employees_temp
+              )
+              ON CONFLICT ("Serial Number") DO UPDATE
+              SET "Serial Number" = excluded."Serial Number";
+          """
+          try:
+              postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+              conn = postgres_hook.get_conn()
+              cur = conn.cursor()
+              cur.execute(query)
+              conn.commit()
+              return 0
+          except Exception as e:
+              return 1
+
+      [create_employees_table, create_employees_temp_table] >> get_data() >> 
merge_data()
+
+
+  dag = Etl()
+
+Save this code to a python file in the ``/dags`` folder (e.g. ``dags/etl.py``) 
and (after a `brief delay 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-dir-list-interval>`_),
 the ``Etl`` DAG will be included in the list of available DAGs on the web UI.
+
+.. image:: ../img/new_tutorial-1.png
+
+You can trigger the ``Etl`` DAG by unpausing it (via the slider on the left 
end) and running it (via the Run button under **Actions**).
+
+.. image:: ../img/new_tutorial-3.png

Review Comment:
   This image and the description should be re-done. It has TreeView which is 
gone now.



##########
docs/apache-airflow/tutorial/pipeline.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+
+Building a Running Pipeline
+===========================
+
+Lets look at another example; we need to get some data from a file which is 
hosted online and insert it into our local database. We also need to look at 
removing duplicate rows while inserting.
+
+Initial setup
+-------------
+
+We need to have Docker installed as we will be using the `quick-start 
docker-compose installation 
<https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html>`_ for 
this example.

Review Comment:
   This is direct URL which is wrong . It should point to the moved page using 
:doc: directive and different name (it's not a quick-start any more).



##########
docs/apache-airflow/tutorial/pipeline.rst:
##########
@@ -0,0 +1,312 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+
+Building a Running Pipeline
+===========================
+
+Lets look at another example; we need to get some data from a file which is 
hosted online and insert it into our local database. We also need to look at 
removing duplicate rows while inserting.
+
+Initial setup
+-------------
+
+We need to have Docker installed as we will be using the `quick-start 
docker-compose installation 
<https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html>`_ for 
this example.
+The steps below should be sufficient, but see the quick-start documentation 
for full instructions.
+
+.. code-block:: bash
+
+  # Download the docker-compose.yaml file
+  curl -LfO 
'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
+
+  # Make expected directories and set an expected environment variable
+  mkdir -p ./dags ./logs ./plugins
+  echo -e "AIRFLOW_UID=$(id -u)" > .env
+
+  # Initialize the database
+  docker-compose up airflow-init
+
+  # Start up all services
+  docker-compose up
+
+After all services have started up, the web UI will be available at: 
``http://localhost:8080``. The default account has the username ``airflow`` and 
the password ``airflow``.
+
+We will also need to create a `connection 
<https://airflow.apache.org/docs/apache-airflow/stable/concepts/connections.html>`_
 to the postgres db. To create one via the web UI, from the "Admin" menu, 
select "Connections", then click the Plus sign to "Add a new record" to the 
list of connections.
+
+Fill in the fields as shown below. Note the Connection Id value, which we'll 
pass as a parameter for the ``postgres_conn_id`` kwarg.
+
+- Connection Id: tutorial_pg_conn
+- Connection Type: postgres
+- Host: postgres
+- Schema: airflow
+- Login: airflow
+- Password: airflow
+- Port: 5432
+
+Test your connection and if the test is successful, save your connection.
+
+Table Creation Tasks
+--------------------
+
+We can use the `PostgresOperator 
<https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html#creating-a-postgres-database-table>`_
 to define tasks that create tables in our postgres db.
+
+We'll create one table to facilitate data cleaning steps (``employees_temp``) 
and another table to store our cleaned data (``employees``).
+
+.. code-block:: python
+
+  from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          CREATE TABLE IF NOT EXISTS employees (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+  create_employees_temp_table = PostgresOperator(
+      task_id="create_employees_temp_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="""
+          DROP TABLE IF EXISTS employees_temp;
+          CREATE TABLE employees_temp (
+              "Serial Number" NUMERIC PRIMARY KEY,
+              "Company Name" TEXT,
+              "Employee Markme" TEXT,
+              "Description" TEXT,
+              "Leave" INTEGER
+          );""",
+  )
+
+Optional: Using SQL From Files
+------------------------------
+
+If you want to abstract these sql statements out of your DAG, you can move the 
statements sql files somewhere within the ``dags/`` directory and pass the sql 
file_path (relative to ``dags/``) to the ``sql`` kwarg. For ``employees`` for 
example, create a ``sql`` directory in ``dags/``, put ``employees`` DDL in 
``dags/sql/employees_schema.sql``, and modify the PostgresOperator() to:
+
+.. code-block:: python
+
+  create_employees_table = PostgresOperator(
+      task_id="create_employees_table",
+      postgres_conn_id="tutorial_pg_conn",
+      sql="sql/employees_schema.sql",
+  )
+
+and repeat for the ``employees_temp`` table.
+
+Data Retrieval Task
+-------------------
+
+Here we retrieve data, save it to a file on our Airflow instance, and load the 
data from that file into an intermediate table where we can execute data 
cleaning steps.
+
+.. code-block:: python
+
+  import os
+  import requests
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def get_data():
+      # NOTE: configure this as appropriate for your airflow environment
+      data_path = "/opt/airflow/dags/files/employees.csv"
+      os.makedirs(os.path.dirname(data_path), exist_ok=True)
+
+      url = 
"https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv";
+
+      response = requests.request("GET", url)
+
+      with open(data_path, "w") as file:
+          file.write(response.text)
+
+      postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+      conn = postgres_hook.get_conn()
+      cur = conn.cursor()
+      with open(data_path, "r") as file:
+          cur.copy_expert(
+              "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS ',' 
QUOTE '\"'",
+              file,
+          )
+      conn.commit()
+
+Data Merge Task
+---------------
+
+Here we select completely unique records from the retrieved data, then we 
check to see if any employee ``Serial Numbers`` are already in the database (if 
they are, we update those records with the new data).
+
+.. code-block:: python
+
+  from airflow.decorators import task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+
+
+  @task
+  def merge_data():
+      query = """
+          INSERT INTO employees
+          SELECT *
+          FROM (
+              SELECT DISTINCT *
+              FROM employees_temp
+          )
+          ON CONFLICT ("Serial Number") DO UPDATE
+          SET "Serial Number" = excluded."Serial Number";
+      """
+      try:
+          postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+          conn = postgres_hook.get_conn()
+          cur = conn.cursor()
+          cur.execute(query)
+          conn.commit()
+          return 0
+      except Exception as e:
+          return 1
+
+
+
+Completing our DAG
+------------------
+
+We've developed our tasks, now we need to wrap them in a DAG, which enables us 
to define when and how tasks should run, and state any dependencies that tasks 
have on other tasks. The DAG below is configured to:
+
+* run every day at midnight starting on Jan 1, 2021,
+* only run once in the event that days are missed, and
+* timeout after 60 minutes
+
+And from the last line in the definition of the ``Etl`` DAG, we see:
+
+.. code-block:: python
+
+      [create_employees_table, create_employees_temp_table] >> get_data() >> 
merge_data()
+
+* the ``merge_data()`` task depends on the ``get_data()`` task,
+* the ``get_data()`` depends on both the ``create_employees_table`` and 
``create_employees_temp_table`` tasks, and
+* the ``create_employees_table`` and ``create_employees_temp_table`` tasks can 
run independently.
+
+Putting all of the pieces together, we have our completed DAG.
+
+.. code-block:: python
+
+  import datetime
+  import pendulum
+  import os
+
+  import requests
+  from airflow.decorators import dag, task
+  from airflow.providers.postgres.hooks.postgres import PostgresHook
+  from airflow.providers.postgres.operators.postgres import PostgresOperator
+
+
+  @dag(
+      schedule="0 0 * * *",
+      start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+      catchup=False,
+      dagrun_timeout=datetime.timedelta(minutes=60),
+  )
+  def Etl():
+      create_employees_table = PostgresOperator(
+          task_id="create_employees_table",
+          postgres_conn_id="tutorial_pg_conn",
+          sql="""
+              CREATE TABLE IF NOT EXISTS employees (
+                  "Serial Number" NUMERIC PRIMARY KEY,
+                  "Company Name" TEXT,
+                  "Employee Markme" TEXT,
+                  "Description" TEXT,
+                  "Leave" INTEGER
+              );""",
+      )
+
+      create_employees_temp_table = PostgresOperator(
+          task_id="create_employees_temp_table",
+          postgres_conn_id="tutorial_pg_conn",
+          sql="""
+              DROP TABLE IF EXISTS employees_temp;
+              CREATE TABLE employees_temp (
+                  "Serial Number" NUMERIC PRIMARY KEY,
+                  "Company Name" TEXT,
+                  "Employee Markme" TEXT,
+                  "Description" TEXT,
+                  "Leave" INTEGER
+              );""",
+      )
+
+      @task
+      def get_data():
+          # NOTE: configure this as appropriate for your airflow environment
+          data_path = "/opt/airflow/dags/files/employees.csv"
+          os.makedirs(os.path.dirname(data_path), exist_ok=True)
+
+          url = 
"https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/tutorial/pipeline_example.csv";
+
+          response = requests.request("GET", url)
+
+          with open(data_path, "w") as file:
+              file.write(response.text)
+
+          postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+          conn = postgres_hook.get_conn()
+          cur = conn.cursor()
+          with open(data_path, "r") as file:
+              cur.copy_expert(
+                  "COPY employees_temp FROM STDIN WITH CSV HEADER DELIMITER AS 
',' QUOTE '\"'",
+                  file,
+              )
+          conn.commit()
+
+      @task
+      def merge_data():
+          query = """
+              INSERT INTO employees
+              SELECT *
+              FROM (
+                  SELECT DISTINCT *
+                  FROM employees_temp
+              )
+              ON CONFLICT ("Serial Number") DO UPDATE
+              SET "Serial Number" = excluded."Serial Number";
+          """
+          try:
+              postgres_hook = PostgresHook(postgres_conn_id="tutorial_pg_conn")
+              conn = postgres_hook.get_conn()
+              cur = conn.cursor()
+              cur.execute(query)
+              conn.commit()
+              return 0
+          except Exception as e:
+              return 1
+
+      [create_employees_table, create_employees_temp_table] >> get_data() >> 
merge_data()
+
+
+  dag = Etl()
+
+Save this code to a python file in the ``/dags`` folder (e.g. ``dags/etl.py``) 
and (after a `brief delay 
<https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-dir-list-interval>`_),
 the ``Etl`` DAG will be included in the list of available DAGs on the web UI.
+
+.. image:: ../img/new_tutorial-1.png
+
+You can trigger the ``Etl`` DAG by unpausing it (via the slider on the left 
end) and running it (via the Run button under **Actions**).
+
+.. image:: ../img/new_tutorial-3.png
+
+In the ``Etl`` DAG's **Tree** view, we see all that all tasks ran successfully 
in all executed runs. Success!

Review Comment:
   We should have a "What's Next?" section here as well. 



##########
docs/apache-airflow/tutorial/fundamentals.rst:
##########
@@ -107,7 +107,7 @@ Operators
 
 An operator defines a unit of work for Airflow to complete. Using operators is 
the classic approach
 to defining work in Airflow. For some use cases, it's better to use the 
TaskFlow API to define
-work in a Pythonic context as described in :doc:`/tutorial_taskflow_api`. For 
now, using operators helps to
+work in a Pythonic context as described in :doc:`taskflow`. For now, using 
operators helps to

Review Comment:
   I think I would use a stronger wording here in favour of Task Flow and make 
it more of a "branch off" point here. 
   I believe we should put more effort  here in explaining WHEN TaskFlow is 
better and why people might want to use it - without them going to see the 
tutorial yet. This is - I think - the best place to do that and have a quick 
overview of where classic operators are better and where TaskFlow approach is. 
The current description does not help to make a decision for the reader, they 
have to read both tutorials to decide which one is better and since the 
"taskflow" is a sepearate doc, they will naturally go and check it less 
frequently, without realising that for them maybe TaskFlow-first is better. 
approach.
   
   I think when somone gets to this place in the tutorial, they should be 
presented with two options (broadly speaking it shoudl likely be formulated 
better) :
   
   1) If you are mostly interested in using the broad set of existing 
integrations and you are not interested in writing a lot of Python code and 
your Python experience is limitied, then likely chosing "classic"  black-box 
operator approach is easier to start with and you should continue reading, But 
checking out of Taskflow once you get familiar with classic operators should 
also be recommmended and we should add "taskfllow tutorial to "what's next" 
section at the end.
   
   2) But if you are experienced Python developer and your case is to create 
more customised and flexible pipelines that could also use the integrations but 
do not fall into the black-box operator approach, TaskFlow is for you and you'd 
do better starting with it rather than continuing to read the "classic" 
approach (you might still come back to it, but Taskflow-first should be the 
first thing to read so that you do not imprint the "classic" approach. DAGs 
written in TaskFlow look very differently from the Classic ones and for a 
number of people, unlearning what they've seen with the classic DAGs might be 
quite difficult.
   
   I think we should also mention the "classic" vs. "modern"  approach or be a 
bit more stonger on tasfklow being a more "Pythonic". Not sure about the 
wording. I think this approach will be much better in promoting the "taskflow" 
approach to more "Pythonic" users, who might be put off by the "classic" 
apprroach (when you look at the taskflow, and you are experienced Python 
developer, the classic approach simply looks terribly ugly and  kinda "you 
don't do it in Python". So those users should be made aware they do not havet 
to learn the classic way to use Airflow - and even discourage them from doing 
so.
   
   I think the words "black-box" vs. "flexible and Pythonic" very well 
describes the difference between the two. Both have benefits for different 
kinds of users and I think being able to respond to both of them here by 
properly redirecting them is crucial.



##########
docs/apache-airflow/tutorial/fundamentals.rst:
##########
@@ -694,7 +403,7 @@ Here's a few things you might want to do next:
     - Take an in-depth tour of the UI - click all the things!
     - Keep reading the docs!
 
-      - Review the :doc:`how-to guides<howto/index>`, which include a guide to 
writing your own operator
+      - Review the :doc:`how-to guides</howto/index>`, which include a guide 
to writing your own operator
       - Review the :ref:`Command Line Interface Reference<cli>`
       - Review the :ref:`List of operators <pythonapi:operators>`
       - Review the :ref:`Macros reference<macros>`

Review Comment:
   I thin we need to add "TaskFlow tutorial" here with explanation why you want 
to do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to