nikhi-suthar commented on code in PR #26269:
URL: https://github.com/apache/airflow/pull/26269#discussion_r979208718


##########
airflow/providers/amazon/aws/hooks/glue.py:
##########
@@ -151,85 +153,150 @@ def print_job_logs(
         job_failed: bool = False,
         next_token: Optional[str] = None,
     ) -> Optional[str]:
-        """Prints the batch of logs to the Airflow task log and returns 
nextToken."""
-        log_client = boto3.client('logs')
-        response = {}
-
-        filter_pattern = FAILURE_LOG_FILTER if job_failed else 
DEFAULT_LOG_FILTER
+        """Prints the batch of Glue cloudwatch logs to the Airflow task log 
and returns nextToken."""
+        credentials = self.get_credentials(region_name=self.conn_region_name)
+        log_client = boto3.client(
+            'logs',
+            region_name=self.conn_region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
         log_group_prefix = self.conn.get_job_run(JobName=job_name, 
RunId=run_id)['JobRun']['LogGroupName']
         log_group_suffix = FAILURE_LOG_SUFFIX if job_failed else 
DEFAULT_LOG_SUFFIX
         log_group_name = f'{log_group_prefix}/{log_group_suffix}'
-
         try:
-            if next_token:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                    nextToken=next_token,
-                )
-            else:
-                response = log_client.filter_log_events(
-                    logGroupName=log_group_name,
-                    logStreamNames=[run_id],
-                    filterPattern=filter_pattern,
-                )
-            if len(response['events']):
+            self.log.info('Glue Job Run Logs')
+            response = log_client.get_log_events(logGroupName=log_group_name, 
logStreamName=run_id)
+            while len(response['events']) > 0:
+                next_token = response["nextForwardToken"]
                 messages = '\t'.join([event['message'] for event in 
response['events']])
-                self.log.info('Glue Job Run Logs:\n\t%s', messages)
-
+                self.log.info('\n\t%s', messages)
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, 
nextToken=next_token
+                )
+            return None
         except log_client.exceptions.ResourceNotFoundException:
             self.log.warning(
                 'No new Glue driver logs found. This might be because there 
are no new logs, '
                 'or might be an error.\nIf the error persists, check the 
CloudWatch dashboard '
                 f'at: 
https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home'
             )
+        return None
 
-        # If no new log events are available, filter_log_events will return 
None.
-        # In that case, check the same token again next pass.
-        return response.get('nextToken') or next_token
+    def print_output_logs(
+        self, job_name: str, run_id: str, log_group_name: str, log_client, 
next_token=None
+    ) -> Optional[str]:
+        """Prints Glue cloudwatch logs continuously to the Airflow task log 
and returns nextToken."""
+        log_group_name = log_group_name + '/' + DEFAULT_LOG_SUFFIX
+        try:
+            if next_token is None:
+                response = 
log_client.get_log_events(logGroupName=log_group_name, logStreamName=run_id)
+            else:
+                response = log_client.get_log_events(
+                    logGroupName=log_group_name, logStreamName=run_id, 
nextToken=next_token
+                )
+            if len(response['events']) > 0:
+                for event in response['events']:
+                    self.log.info("[%s] %s", job_name, event['message'])
+                return response["nextForwardToken"]
+            else:
+                return None
+        except log_client.exceptions.ResourceNotFoundException:
+            self.log.warning("Waiting for the Glue Job output log stream 
%s/%s", log_group_name, run_id)
+            return None
+        except Exception as E:
+            self.log.warning(str(E))
+            return None
 
-    def job_completion(self, job_name: str, run_id: str, verbose: bool = 
False) -> Dict[str, str]:
+    def job_completion(
+        self, job_name: str, run_id: str, verbose: bool = False, 
continuous_logging: bool = False
+    ) -> Dict[str, str]:
         """
         Waits until Glue job with job_name completes or
         fails and return final state if finished.
         Raises AirflowException when the job failed
         :param job_name: unique job name per AWS account
         :param run_id: The job-run ID of the predecessor job run
-        :param verbose: If True, more Glue Job Run logs show in the Airflow 
Task Logs.  (default: False)
-        :return: Dict of JobRunState and JobRunId
+        :param verbose: If True, Glue Job Run logs show in the Airflow Task 
Logs after completion of the jobs.
+        (default: False)
+        :param continuous_logging: If True (qnd verbose also True), then print 
Glue job output cloudwatch logs
+        continuously (once log stream available) in the Airflow Logs.(default: 
False)
+        :return: Dict of JobRunState and JobRunId (after completion) otherwise 
failure exception.
         """
+        region_name = self.conn_region_name
+        self.log.info("region name - %s", region_name)
+        credentials = self.get_credentials(region_name=region_name)
         failed_states = ['FAILED', 'TIMEOUT']
         finished_states = ['SUCCEEDED', 'STOPPED']
+        log_client = boto3.client(
+            'logs',
+            region_name=region_name,
+            aws_access_key_id=credentials.access_key,
+            aws_secret_access_key=credentials.secret_key,
+        )
+        glue_client = self.get_conn()
         next_log_token = None
+        mode = None
         job_failed = False
+        if verbose and continuous_logging:
+            mode = 'CONTINUE'
+            self.log.info("Continuous logging mode is enable.")
+        elif verbose:
+            mode = 'BATCH'
+            self.log.info("Batch logging mode is enable.")
+        else:
+            self.log.info("Logging is disable.")
 
         while True:
             try:
-                job_run_state = self.get_job_state(job_name, run_id)
-                if job_run_state in finished_states:
-                    self.log.info('Exiting Job %s Run State: %s', run_id, 
job_run_state)
-                    return {'JobRunState': job_run_state, 'JobRunId': run_id}
-                if job_run_state in failed_states:
-                    job_failed = True
-                    job_error_message = f'Exiting Job {run_id} Run State: 
{job_run_state}'
-                    self.log.info(job_error_message)
-                    raise AirflowException(job_error_message)
-                else:
-                    self.log.info(
-                        'Polling for AWS Glue Job %s current run state with 
status %s',
-                        job_name,
-                        job_run_state,
-                    )
-                    time.sleep(self.JOB_POLL_INTERVAL)
-            finally:
-                if verbose:
-                    next_log_token = self.print_job_logs(
+                job_run = glue_client.get_job_run(JobName=job_name, 
RunId=run_id, PredecessorsIncluded=True)
+                job_run_state = job_run['JobRun']['JobRunState']
+                msg = f'[Batch] The Job {job_name} with run id {run_id} is : 
{job_run_state}'
+                if mode == 'CONTINUE':
+                    log_group_name = job_run['JobRun']['LogGroupName']
+                    token = self.print_output_logs(
                         job_name=job_name,
                         run_id=run_id,
-                        job_failed=job_failed,
+                        log_group_name=log_group_name,
+                        log_client=log_client,
                         next_token=next_log_token,
                     )
+                    while token is not None:
+                        next_log_token = token
+                        token = self.print_output_logs(
+                            job_name=job_name,
+                            run_id=run_id,
+                            log_group_name=log_group_name,
+                            log_client=log_client,
+                            next_token=next_log_token,
+                        )
+                elif mode == 'BATCH':
+                    msg = f"The Job {job_name} with run id {run_id} is : 
{job_run_state}"
+                    self.log.info(msg)
+                else:
+                    self.log.info(msg)

Review Comment:
   Sure will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to