mszpot-future-processing commented on issue #41306: URL: https://github.com/apache/airflow/issues/41306#issuecomment-2275995248
Ok, @Lee-W, goal is to have single task instance that will create n-number of glue jobs using `expand` method.   Each Glue will have a set of static arguments (`partial`), rest is going to be injected with `expand`. Current code I got is as follows and fails due to `return` not being able to serialize `GlueJobOperator`: ``` import os import sys from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.utils.task_group import TaskGroup from airflow.decorators import task_group, task, dag from airflow.operators.python import PythonOperator sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) from utils.environment_config import EnvironmentConfig # noqa: E402 config = EnvironmentConfig(__file__) import json params_one = ["value"] params_two = ["1","2"] params_three = [4, 12, 52] params_four = [3] param_five = ["col"] playground_bucket = config.get_environment_variable("playground_bucket_name", default_var="undefined") intermittent_data_location = config.get_environment_variable("stage3_output_intermittent_location", default_var="undefined") stage3_task_role = config.get_environment_variable("stage3_task_role", default_var="undefined") join_bridge_script = config.get_bridge_script("join_bridge_script.py") #default_args={ "donot_pickle": "True" } @dag(dag_id='chore_task_group_stage3', schedule=None, catchup=False) def pipeline(): @task def lag_tasks_with_filter( param_one, demo, param_three, param_four, , lag_task_role, intermittent_data_location, playground_bucket ): return GlueJobOperator( task_id=f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}", job_name=config.generate_job_name(f"param_four{param_four}-weeks{param_three}-" + f"filter{param_five}-job-{param_one}-{param_two}"), script_location=config.get_bridge_script("lags_bridge_script.py"), iam_role_name=lag_task_role, script_args={ "--lagWithCatPath": f"s3://{intermittent_data_location}/output/with_cat" + f"/param_one={param_one}/param_two={param_two}", "--rawDataInputPath": f"s3://{playground_bucket}/output/oneyear" + f"/param_one={param_one}/param_two={param_two}/", "--numberOfLagWeeks": str(param_four), "--windowSizeWeeks": str(param_three), "--filterCol": param_five, "--taskId": f"create_task_{param_one}_{param_two}_w{param_three}param_four{param_four}param_five{param_five}", }, create_job_kwargs={ "WorkerType": "G.2X", "NumberOfWorkers": 5, "GlueVersion": "4.0", "DefaultArguments": { "--job-language": "python", "--enable-job-insights": "true", "--enable-metrics": "true", "--enable-auto-scaling": "true", "--enable-observability-metrics": "true", "--TempDir": f"s3://{config.get_environment_variable('glue_tmp_dir_location', default_var='undefined')}", "--extra-py-files": config.get_asset_file_location( "ctc_telligence_forecasting_data_product-0.0.1-py3-none-any.whl" ), "--enable-spark-ui": "true", "--spark-event-logs-path": f"s3://{config.get_environment_variable('glue_spark_ui_logs_location', default_var='undefined')}", }, }, update_config=True, ) ts = DummyOperator(task_id='start') te = DummyOperator(task_id='end') t1 = lag_tasks_with_filter.partial(lag_task_role=stage3_task_role, intermittent_data_location=intermittent_data_location, playground_bucket=playground_bucket).expand(param_one=params_one, param_two=params_two, param_three=params_three, param_four=params_four, param_five=param_five) # setting dependencies ts >> t1 >> te pipeline() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
