paulsyl commented on pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#issuecomment-626134359
> I ran some end-to-end tests using this code in one of my dag files. It's
working ok if you make those changes following my comments.
>
> Right now I'm trying to retrieve the logs generated by Glue jobs so that
we could print out the logs in Airflow. Without the logs in Airflow, debugging
failed jobs is a hassle.
>
> Update:
> Obtaining logs is easy, just add this in the operator class:
>
> ```python
> GLUE_LOGS_GROUP = "/aws-glue/jobs/output"
> GLUE_ERRS_GROUP = "/aws-glue/jobs/error"
> ```
>
> as class attributes
>
> ```python
> def get_glue_logs(self, log_group_name, log_stream_name):
> """Glue logs are too chatty, only get the ones that have errors"""
> self.log.info('Glue job logs output from group %s:',
log_group_name)
> for event in self.get_logs_hook().get_log_events(
> log_group_name,
> log_stream_name,
> ):
> event_dt = datetime.fromtimestamp(event['timestamp'] / 1000.0)
> event_msg = event['message']
> # Glue logs are extremely chatty, we only get log entries that
have "error"
> if "error" in event_msg:
> self.log.info("[%s] %s", event_dt.isoformat(), event_msg)
>
> def get_logs_hook(self):
> """Create and return an AwsLogsHook."""
> return AwsLogsHook(
> aws_conn_id=self.aws_conn_id,
> region_name=self.awslogs_region
> )
> ```
>
> as class methods, and call it in `execute()`
>
> ```python
> glue_job_run_id = glue_job_run['JobRunId']
>
> self.get_glue_logs(self.GLUE_LOGS_GROUP, glue_job_run_id)
> ```
I would be very careful in making this mandatory in the operator class. It
is entirely possible that this could be used in a secure environment where
access to Cloudwatch logs is not always possibly. In the event of error, it is
possible to retrieve the error_message from the Glue API and return in the
Airflow log.
I have used the following to return failed status back to Airflow logs.
```
errored_states = ['FAILED', 'STOPPED', 'TIMEOUT']
elif job_run_state in errored_states:
time.sleep(3) # Wait for API communication with the Error
Message
job_message = "Exiting Job " + run_id + " Run State: " +
job_run_state
if job_run_state == 'FAILED':
self.log.info("JOB ERROR: %s",
job_status['JobRun']['ErrorMessage'])
raise AirflowException(job_message)
```
`ErrorMessage` is an optional response from the API
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]