Subham-KRLX commented on issue #67178:
URL: https://github.com/apache/airflow/issues/67178#issuecomment-4505847887
> FYI.. I still faced the same issue with the fix you provided. There is
another task where we see some task logs, sharing task logs where it worked vs
where it failed for the same table. For the failed one, job was submitted and
succeeded fine in EMR
>
> **Passed:**
>
> ```
> Reading remote log from Cloudwatch log_group:
arn:aws:logs:us-west-2:xxx:log-group:airflow-abc-MwaaEnvironment-Task
log_stream:
dag_id=mynamespace_xxxxx/run_id=manual__2026-05-20T06_53_06.800846+00_00/task_id=KP.mynamespace_csv_ingest_mytable/attempt=1.log
> [2026-05-20, 12:35:55] WARNING -
/usr/local/airflow/.local/lib/python3.12/site-packages/flask_sqlalchemy/model.py:121:
SAWarning: This declarative base already contains a class with the same class
name and module name as iam.MWAASession, and will be replaced in the
string-lookup table. super(BindMetaMixin, cls).__init__(name, bases, d):
source="py.warnings"
> [2026-05-20, 12:35:55] INFO - DAG bundles loaded: dags-folder:
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
> [2026-05-20, 12:35:55] INFO - Filling up the DagBag from
/usr/local/airflow/dags/mynamespace_ns/csv_load_dags/xxxxx.py:
source="airflow.models.dagbag.DagBag"
> [2026-05-20, 12:35:55] WARNING -
/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/models/connection.py:471:
DeprecationWarning: Using Connection.get_connection_from_secrets from
`airflow.models` is deprecated.Please use `get` on Connection from
sdk(`airflow.sdk.Connection`) instead warnings.warn(: source="py.warnings"
> [2026-05-20, 12:35:56] INFO - Connection Retrieved 'aws_default':
source="airflow.hooks.base"
> [2026-05-20, 12:35:56] INFO - Starting job on Application: myappid:
source="airflow.task.operators.edfx_emr_serverless_operator.EdfxEmrServerlessStartJobOperator"
> [2026-05-20, 12:35:56] INFO - EMR serverless job started:
00g5ql0rdccnpg0n:
source="airflow.task.operators.edfx_emr_serverless_operator.EdfxEmrServerlessStartJobOperator"
> [2026-05-20, 12:35:56] INFO - Serverless Job status is: SUBMITTED -
SUBMITTED: source="waiter_with_logging"
> [2026-05-20, 12:36:56] INFO - Serverless Job status is: RUNNING:
source="waiter_with_logging"
> [2026-05-20, 12:37:56] INFO - Pushing xcom:
ti="RuntimeTaskInstance(id=UUID('019cc78ed-941e-7f0f656105c6'),
task_id='KP.mynamespace_csv_ingest_mytable', dag_id='mynamespace_xxxxx',
run_id='manual__2026-05-20T06:53:06.800846+00:00', try_number=1, map_index=-1,
hostname='ip-10-151-47-166.us-west-2.compute.internal', context_carrier={},
task=<Task(EdfxEmrServerlessStartJobOperator):
KP.mynamespace_csv_ingest_mytable>,
bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0,
start_date=datetime.datetime(2026, 5, 20, 7, 5, 55, 318443,
tzinfo=datetime.timezone.utc), end_date=None, state=<TaskInstanceState.RUNNING:
'running'>, is_mapped=False, rendered_map_index=None,
log_url='https://a5cca3ac-1398-448f-a42f-1e87b05867a4-vpce.c29.airflow.us-west-2.on.awsdags/mynamespace_xxxxx/runs/manual__2026-05-20T06%3A53%3A06.800846%2B00%3A00/tasks/KP.mynamespace_csv_ingest_mytable?try_number=1%27)%22:
source="task"
> [2026-05-20, 12:37:56] WARNING - No XCom value found; defaulting to None.:
key="emr_serverless_s3_logs": dag_id="mynamespace_xxxxx":
task_id="KP.mynamespace_csv_ingest_mytable":
run_id="manual__2026-05-20T06:53:06.800846+00:00": map_index=-1: source="task"
> [2026-05-20, 12:37:56] WARNING - No XCom value found; defaulting to None.:
key="emr_serverless_cloudwatch_logs": dag_id="mynamespace_xxxxx":
task_id="KP.mynamespace_csv_ingest_mytable":
run_id="manual__2026-05-20T06:53:06.800846+00:00": map_index=-1: source="task"
> [2026-05-20, 12:37:56] WARNING - No XCom value found; defaulting to None.:
key="emr_serverless_dashboard": dag_id="mynamespace_xxxxx":
task_id="KP.mynamespace_csv_ingest_mytable":
run_id="manual__2026-05-20T06:53:06.800846+00:00": map_index=-1: source="task"
> [2026-05-20, 12:37:56] WARNING - No XCom value found; defaulting to None.:
key="emr_serverless_logs": dag_id="mynamespace_xxxxx":
task_id="KP.mynamespace_csv_ingest_mytable":
run_id="manual__2026-05-20T06:53:06.800846+00:00": map_index=-1: source="task"
> ```
>
> **Failed:**
>
> ```
> [2026-05-20, 16:01:25] INFO - Starting job on Application: myappid:
source="airflow.task.operators.edfx_emr_serverless_operator.EdfxEmrServerlessStartJobOperator"
> [2026-05-20, 16:01:25] INFO - EMR serverless job started: jobid:
source="airflow.task.operators.edfx_emr_serverless_operator.EdfxEmrServerlessStartJobOperator"
> [2026-05-20, 16:01:25] INFO - Using backported waiter_with_logging.wait
(module=waiter_with_logging,
file=/usr/local/airflow/dags/mynamespace_ns/_commonutil/waiter_with_logging.py,
max_attempts=480, delay=60s, args={'applicationId': 'myappid', 'jobRunId':
'jobid'}): source="waiter_with_logging"
> [2026-05-20, 16:01:25] INFO - Serverless Job status is [attempt 1/480]:
SUBMITTED - SUBMITTED: source="waiter_with_logging"
> [2026-05-20, 16:01:41] ERROR - Server indicated the task shouldn't be
running anymore. Terminating process:
detail={"detail":{"reason":"not_running","message":"TI is no longer in the
running state and task should terminate","current_state":"failed"}}:
source="task"
> [2026-05-20, 16:01:41] INFO - Stopping job run with jobId - jobid:
source="airflow.task.operators.edfx_emr_serverless_operator.EdfxEmrServerlessStartJobOperator"
> [2026-05-20, 16:01:41] ERROR - Task failed with exception:
source="task"ClientError: An error occurred (AccessDeniedException) when
calling the CancelJobRun operation: User:
arn:aws:sts::accid:assumed-role/abc-MwaaEnvRole/AmazonMWAA-iamrole is not
authorized to perform: emr-serverless:CancelJobRun on resource:
arn:aws:emr-serverless:us-west-2:accid:/applications/myappid/jobruns/jobid
because no identity-based policy allows the emr-serverless:CancelJobRun action
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
line 920 in run
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
line 1215 in _execute_task
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py",
line 397 in wrapper
> File
"/usr/local/airflow/dags/mynamespace_ns/_commonutil/edfx_emr_serverless_operator.py",
line 101 in execute
> File
"/usr/local/airflow/dags/mynamespace_ns/_commonutil/waiter_with_logging.py",
line 101 in wait
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
line 891 in _on_term
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/providers/amazon/aws/operators/emr.py",
line 1294 in on_kill
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/botocore/client.py",
line 601 in _api_call
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/botocore/context.py",
line 123 in wrapper
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/botocore/client.py",
line 1074 in _make_api_call
> [2026-05-20, 16:01:41] WARNING - No XCom value found; defaulting to None.:
key="emr_serverless_s3_logs": dag_id="mynamespace_xxxxx":
task_id="KP.mynamespace_csv_ingest_endispositionreason":
run_id="manual__2026-05-20T09:55:43.497627+00:00": map_index=-1: source="task"
> [2026-05-20, 16:01:41] WARNING - No XCom value found; defaulting to None.:
key="emr_serverless_cloudwatch_logs": dag_id="mynamespace_xxxxx":
task_id="KP.mynamespace_csv_ingest_endispositionreason":
run_id="manual__2026-05-20T09:55:43.497627+00:00": map_index=-1: source="task"
> [2026-05-20, 16:01:41] WARNING - No XCom value found; defaulting to None.:
key="emr_serverless_dashboard": dag_id="mynamespace_xxxxx":
task_id="KP.mynamespace_csv_ingest_endispositionreason":
run_id="manual__2026-05-20T09:55:43.497627+00:00": map_index=-1: source="task"
> [2026-05-20, 16:01:41] WARNING - No XCom value found; defaulting to None.:
key="emr_serverless_logs": dag_id="mynamespace_xxxxx":
task_id="KP.mynamespace_csv_ingest_endispositionreason":
run_id="manual__2026-05-20T09:55:43.497627+00:00": map_index=-1: source="task"
> [2026-05-20, 16:01:41] ERROR - Top level error:
source="task"UndefinedError:
'airflow.sdk.execution_time.task_runner.RuntimeTaskInstance object' has no
attribute 'mark_success_url'
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
line 1353 in main
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
line 1330 in finalize
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
line 1161 in _send_task_error_email
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
line 411 in _get_email_subject_content
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/models/taskinstance.py",
line 408 in render
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/utils/helpers.py",
line 244 in render_template_to_string
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/airflow/utils/helpers.py",
line 239 in render_template
> File "<template>", line 26 in root
> File
"/usr/local/airflow/.local/lib/python3.12/site-packages/jinja2/runtime.py",
line 859 in _fail_with_undefined_error
> [2026-05-20, 16:01:41] WARNING - Process exited abnormally: exit_code=1:
source="task"
> [2026-05-20, 16:01:41] ERROR - Task killed!: source="task"
> ```
The waiter fix handles transient AWS throttling errors. However looking at
your logs it seems the task is being terminated by Airflow itself after about
16 seconds with "TI is no longer in the running state". This appears to be a
separate issue where the task state is being marked as failed in the database
while the waiter is still running. This might be a different timeout or task
state management issue. Can you open a separate issue for this specific
behavior so it can be investigated independently?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]