nikhi-suthar commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r979208718
##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
job_failed: bool = False,
next_token: Optional[str] = None,
) -> Optional[str]:
- """Prints the batch of logs to the Airflow task log and returns
nextToken."""
- log_client = boto3.client('logs')
- response = {}
-
- filter_pattern = FAILURE_LOG_FILTER if job_failed else
DEFAULT_LOG_FILTER
+ """Prints the batch of Glue cloudwatch logs to the Airflow task log
and returns nextToken."""
+ credentials = self.get_credentials(region_name=self.conn_region_name)
+ log_client = boto3.client(
+ 'logs',
+ region_name=self.conn_region_name,
+ aws_access_key_id=credentials.access_key,
+ aws_secret_access_key=credentials.secret_key,
+ )
log_group_prefix = self.conn.get_job_run(JobName=job_name,
RunId=run_id)['JobRun']['LogGroupName']
log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else
DEFAULT_LOG_SUFFIX
log_group_name = f'{log_group_prefix}/{log_group_suffix}'
-
try:
- if next_token:
- response = log_client.filter_log_events(
- logGroupName=log_group_name,
- logStreamNames=[run_id],
- filterPattern=filter_pattern,
- nextToken=next_token,
- )
- else:
- response = log_client.filter_log_events(
- logGroupName=log_group_name,
- logStreamNames=[run_id],
- filterPattern=filter_pattern,
- )
- if len(response['events']):
+ self.log.info('Glue Job Run Logs')
+ response = log_client.get_log_events(logGroupName=log_group_name,
logStreamName=run_id)
+ while len(response['events']) > 0:
+ next_token = response["nextForwardToken"]
messages = '\t'.join([event['message'] for event in
response['events']])
- self.log.info('Glue Job Run Logs:\n\t%s', messages)
-
+ self.log.info('\n\t%s', messages)
+ response = log_client.get_log_events(
+ logGroupName=log_group_name, logStreamName=run_id,
nextToken=next_token
+ )
+ return None
except log_client.exceptions.ResourceNotFoundException:
self.log.warning(
'No new Glue driver logs found. This might be because there
are no new logs, '
'or might be an error.\nIf the error persists, check the
CloudWatch dashboard '
f'at:
https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
)
+ return None
- # If no new log events are available, filter_log_events will return
None.
- # In that case, check the same token again next pass.
- return response.get('nextToken') or next_token
+ def print_output_logs(
+ self, job_name: str, run_id: str, log_group_name: str, log_client,
next_token=None
+ ) -> Optional[str]:
+ """Prints Glue cloudwatch logs continuously to the Airflow task log
and returns nextToken."""
+ log_group_name = log_group_name + '/' + DEFAULT_LOG_SUFFIX
+ try:
+ if next_token is None:
+ response =
log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+ else:
+ response = log_client.get_log_events(
+ logGroupName=log_group_name, logStreamName=run_id,
nextToken=next_token
+ )
+ if len(response['events']) > 0:
+ for event in response['events']:
+ self.log.info("[%s] %s", job_name, event['message'])
+ return response["nextForwardToken"]
+ else:
+ return None
+ except log_client.exceptions.ResourceNotFoundException:
+ self.log.warning("Waiting for the Glue Job output log stream
%s/%s", log_group_name, run_id)
+ return None
+ except Exception as E:
+ self.log.warning(str(E))
+ return None
- def job_completion(self, job_name: str, run_id: str, verbose: bool =
False) -> Dict[str, str]:
+ def job_completion(
+ self, job_name: str, run_id: str, verbose: bool = False,
continuous_logging: bool = False
+ ) -> Dict[str, str]:
"""
Waits until Glue job with job_name completes or
fails and return final state if finished.
Raises AirflowException when the job failed
:param job_name: unique job name per AWS account
:param run_id: The job-run ID of the predecessor job run
- :param verbose: If True, more Glue Job Run logs show in the Airflow
Task Logs. (default: False)
- :return: Dict of JobRunState and JobRunId
+ :param verbose: If True, Glue Job Run logs show in the Airflow Task
Logs after completion of the jobs.
+ (default: False)
+ :param continuous_logging: If True (qnd verbose also True), then print
Glue job output cloudwatch logs
+ continuously (once log stream available) in the Airflow Logs.(default:
False)
+ :return: Dict of JobRunState and JobRunId (after completion) otherwise
failure exception.
"""
+ region_name = self.conn_region_name
+ self.log.info("region name - %s", region_name)
+ credentials = self.get_credentials(region_name=region_name)
failed_states = ['FAILED', 'TIMEOUT']
finished_states = ['SUCCEEDED', 'STOPPED']
+ log_client = boto3.client(
+ 'logs',
+ region_name=region_name,
+ aws_access_key_id=credentials.access_key,
+ aws_secret_access_key=credentials.secret_key,
+ )
+ glue_client = self.get_conn()
next_log_token = None
+ mode = None
job_failed = False
+ if verbose and continuous_logging:
+ mode = 'CONTINUE'
+ self.log.info("Continuous logging mode is enable.")
+ elif verbose:
+ mode = 'BATCH'
+ self.log.info("Batch logging mode is enable.")
+ else:
+ self.log.info("Logging is disable.")
while True:
try:
- job_run_state = self.get_job_state(job_name, run_id)
- if job_run_state in finished_states:
- self.log.info('Exiting Job %s Run State: %s', run_id,
job_run_state)
- return {'JobRunState': job_run_state, 'JobRunId': run_id}
- if job_run_state in failed_states:
- job_failed = True
- job_error_message = f'Exiting Job {run_id} Run State:
{job_run_state}'
- self.log.info(job_error_message)
- raise AirflowException(job_error_message)
- else:
- self.log.info(
- 'Polling for AWS Glue Job %s current run state with
status %s',
- job_name,
- job_run_state,
- )
- time.sleep(self.JOB_POLL_INTERVAL)
- finally:
- if verbose:
- next_log_token = self.print_job_logs(
+ job_run = glue_client.get_job_run(JobName=job_name,
RunId=run_id, PredecessorsIncluded=True)
+ job_run_state = job_run['JobRun']['JobRunState']
+ msg = f'[Batch] The Job {job_name} with run id {run_id} is :
{job_run_state}'
+ if mode == 'CONTINUE':
+ log_group_name = job_run['JobRun']['LogGroupName']
+ token = self.print_output_logs(
job_name=job_name,
run_id=run_id,
- job_failed=job_failed,
+ log_group_name=log_group_name,
+ log_client=log_client,
next_token=next_log_token,
)
+ while token is not None:
+ next_log_token = token
+ token = self.print_output_logs(
+ job_name=job_name,
+ run_id=run_id,
+ log_group_name=log_group_name,
+ log_client=log_client,
+ next_token=next_log_token,
+ )
+ elif mode == 'BATCH':
+ msg = f"The Job {job_name} with run id {run_id} is :
{job_run_state}"
+ self.log.info(msg)
+ else:
+ self.log.info(msg)
Review Comment:
Sure will do
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]