mobuchowski opened a new issue, #38772:
URL: https://github.com/apache/airflow/issues/38772

   ### Body
   
   Make sure end-to-end DAG example works and emits proper OpenLineage events.
   
   Take special care to make sure dataset naming is consistent between 
Hook-sourced lineage from PythonOperator and implementations in particular S3 
Operators - S3DeleteObjectsOperator and S3CopyObjectOperator
   ```
   """
   ### Copy and Delete Data in S3
   
   Upload files to one S3 bucket, copy it to another, and delete it.
   
   This DAG shows how to use a Python function that calls the S3 hook to 
generate and copy files into S3, and then delete them. 
   Additionally, this DAG is written to work with the Kubernetes Executor. As 
such, custom resource requests are set.
   
   
   This DAG was part of an Intro to Airflow Workshop. 
   
   """
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.python_operator import PythonOperator
   from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
   from airflow.providers.amazon.aws.operators.s3 import S3DeleteObjectsOperator
   from airflow.providers.amazon.aws.hooks.s3 import S3Hook
   from datetime import datetime, timedelta
   import os
   
   
   S3_CONN_ID='astro-s3-workshop'
   BUCKET='astro-workshop-bucket'
   
   name='workshop'
   
   
   def upload_to_s3(file_name):
   
       # Instanstiaute
       s3_hook=S3Hook(aws_conn_id=S3_CONN_ID) 
       
       # Create file
       sample_file = "{0}_file_{1}.txt".format(name, file_name) #swap your name 
here
       example_file = open(sample_file, "w+")
       example_file.write("Putting some data in for task {0}".format(file_name))
       example_file.close()
       
       s3_hook.load_file(sample_file, 'globetelecom/{0}'.format(sample_file), 
bucket_name=BUCKET, replace=True)
   
       
   
   
   # Default settings applied to all tasks
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'email_on_failure': False,
       'email_on_retry': False,
       'retries': 1,
       'retry_delay': timedelta(minutes=5)
   }
   
   # Resources
   test_config2 = {"KubernetesExecutor": 
   {"request_memory": "400Mi", 
   "limit_memory": "400Mi", 
   "request_cpu": "400m", 
   "limit_cpu": "400m"}}
   
   
   # Using a DAG context manager, you don't have to specify the dag property of 
each task
   with DAG('s3_upload_copy_delete',
            start_date=datetime(2019, 1, 1),
            max_active_runs=1,
            schedule_interval='0 12 8-14,22-28 * 6',  # 
https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
            default_args=default_args,
            catchup=False # enable if you don't want historical dag runs to run
            ) as dag:
   
       t0 = DummyOperator(task_id='start')
   
       for i in range(0,10): # generates 10 tasks
           generate_files=PythonOperator(
               task_id='generate_file_{0}_{1}'.format(name, i), # note the task 
id is dynamic
               python_callable=upload_to_s3,
               op_kwargs= {'file_name': i}
           )
   
           copy_files = S3CopyObjectOperator(
               task_id='copy_{0}_file_{1}'.format(name,i), 
               
source_bucket_key='globetelecom/{0}_file_{1}_testfile_exist.txt'.format(name, 
i),
               
dest_bucket_key='globetelecom/copy_{0}_file_{1}.txt'.format(name, i),
               source_bucket_name=BUCKET,
               dest_bucket_name=BUCKET,
               aws_conn_id=S3_CONN_ID
           )
   
           delete_files = S3DeleteObjectsOperator(
               task_id='delete_{0}_file_{1}'.format(name,i),
               keys='globetelecom_copy/{0}_file_{1}'.format(name,i),
               bucket=BUCKET,
               aws_conn_id=S3_CONN_ID
           )
   
           t0 >> generate_files >> copy_files >> delete_files # Make sure this 
is indented inside the scope of the loop
   ```
   
   ### Committer
   
   - [X] I acknowledge that I am a maintainer/committer of the Apache Airflow 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to