mszpot-future-processing commented on issue #41306:
URL: https://github.com/apache/airflow/issues/41306#issuecomment-2275995248

   Ok, @Lee-W, goal is to have single task instance that will create n-number 
of glue jobs using `expand` method.
   
   
![obraz](https://github.com/user-attachments/assets/817f11cd-bade-454a-a1d7-016fff8ee94d)
   
![obraz](https://github.com/user-attachments/assets/cdca7da4-4928-44bc-9497-32811d3ef472)
   
   Each Glue will have a set of static arguments (`partial`), rest is going to 
be injected with `expand`.
   
   Current code I got is as follows and fails due to `return` not being able to 
serialize `GlueJobOperator`:
   
   
   ```
   import os
   import sys
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
   from airflow.utils.task_group import TaskGroup
   from airflow.decorators import task_group, task, dag
   from airflow.operators.python import PythonOperator
   
   sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
   
   from utils.environment_config import EnvironmentConfig  # noqa: E402
   
   config = EnvironmentConfig(__file__)
   
   import json
   
   
   params_one = ["value"]
   params_two = ["1","2"]
   
   params_three = [4, 12, 52]
   params_four = [3]
   param_five = ["col"]
   
   playground_bucket = 
config.get_environment_variable("playground_bucket_name", 
default_var="undefined")
   intermittent_data_location = 
config.get_environment_variable("stage3_output_intermittent_location", 
default_var="undefined")
   stage3_task_role = config.get_environment_variable("stage3_task_role", 
default_var="undefined")
   join_bridge_script = config.get_bridge_script("join_bridge_script.py")
   
   
   #default_args={ "donot_pickle": "True" }
   @dag(dag_id='chore_task_group_stage3', schedule=None, catchup=False)
   def pipeline():
   
       @task
       def lag_tasks_with_filter(
           param_one,
           demo,
           param_three,
           param_four,
           ,
           lag_task_role,
           intermittent_data_location,
           playground_bucket
       ):
   
           return GlueJobOperator(
               
task_id=f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}",
               
job_name=config.generate_job_name(f"param_four{param_four}-weeks{param_three}-" 
+ f"filter{param_five}-job-{param_one}-{param_two}"),
               
script_location=config.get_bridge_script("lags_bridge_script.py"),
               iam_role_name=lag_task_role,
               script_args={
                       "--lagWithCatPath": 
f"s3://{intermittent_data_location}/output/with_cat" + 
f"/param_one={param_one}/param_two={param_two}",
                       "--rawDataInputPath": 
f"s3://{playground_bucket}/output/oneyear" + 
f"/param_one={param_one}/param_two={param_two}/",
                       "--numberOfLagWeeks": str(param_four),
                       "--windowSizeWeeks": str(param_three),
                       "--filterCol": param_five,
                       "--taskId": 
f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}",
    
               },
               create_job_kwargs={
                       "WorkerType": "G.2X",
                       "NumberOfWorkers": 5,
                       "GlueVersion": "4.0",
                       "DefaultArguments": {
                                           "--job-language": "python",
                                           "--enable-job-insights": "true",
                                           "--enable-metrics": "true",
                                           "--enable-auto-scaling": "true",
                                           "--enable-observability-metrics": 
"true",
                                           "--TempDir": 
f"s3://{config.get_environment_variable('glue_tmp_dir_location', 
default_var='undefined')}",
                                           "--extra-py-files": 
config.get_asset_file_location(
                                               
"ctc_telligence_forecasting_data_product-0.0.1-py3-none-any.whl"
                                           ),
                                           "--enable-spark-ui": "true",
                                           "--spark-event-logs-path": 
f"s3://{config.get_environment_variable('glue_spark_ui_logs_location', 
default_var='undefined')}",
                                       },
               },
               update_config=True,
           )
   
   
       ts = DummyOperator(task_id='start')
       te = DummyOperator(task_id='end')
       t1 = lag_tasks_with_filter.partial(lag_task_role=stage3_task_role, 
intermittent_data_location=intermittent_data_location, 
playground_bucket=playground_bucket).expand(param_one=params_one, 
param_two=params_two, param_three=params_three, param_four=params_four, 
param_five=param_five)
   
       # setting dependencies
       ts >> t1 >> te
   
   pipeline()
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to