mik-laj commented on a change in pull request #9331: URL: https://github.com/apache/airflow/pull/9331#discussion_r446841962
########## File path: airflow/api_connexion/endpoints/log_endpoint.py ########## @@ -15,12 +15,83 @@ # specific language governing permissions and limitations # under the License. -# TODO(mik-laj): We have to implement it. -# Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135 +from flask import Response, current_app, request +from itsdangerous.exc import BadSignature +from itsdangerous.url_safe import URLSafeSerializer +from airflow.api_connexion.exceptions import BadRequest, NotFound +from airflow.api_connexion.schemas.log_schema import LogResponseObject, logs_schema +from airflow.models import DagRun +from airflow.utils.log.log_reader import TaskLogReader +from airflow.utils.session import provide_session -def get_log(): + +@provide_session +def get_log(session, dag_id, dag_run_id, task_id, task_try_number, + full_content=False, token=None): """ Get logs for specific task instance """ - raise NotImplementedError("Not implemented yet.") + if not token: + metadata = {} + else: + key = current_app.config["SECRET_KEY"] + try: + metadata = URLSafeSerializer(key).loads(token) + except BadSignature: + raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer") + + if metadata.get('download_logs', None) and metadata['download_logs']: + full_content = True + + if full_content: + metadata['download_logs'] = True + else: + metadata['download_logs'] = False + + task_log_reader = TaskLogReader() + if not task_log_reader.is_supported: + raise BadRequest("Task log handler does not support read logs.") + + query = session.query(DagRun).filter(DagRun.dag_id == dag_id) + dag_run = query.filter(DagRun.run_id == dag_run_id).first() + if not dag_run: + raise NotFound("DAG Run not found") + + ti = dag_run.get_task_instance(task_id, session) + if ti is None: + metadata['end_of_log'] = True + raise BadRequest(detail="Task instance did not exist in the DB") + + try: + dag = current_app.dag_bag.get_dag(dag_id) + if dag: + ti.task = dag.get_task(ti.task_id) + + return_type = request.accept_mimetypes.best_match(['text/plain', 'application/json']) + + # return_type would be either the above two or None + + if return_type == 'application/json' or return_type is None: # default + logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata) + logs = logs[0] if task_try_number is not None else logs + return logs_schema.dump(LogResponseObject(continuation_token=str(metadata), + content=logs) + ) + # text/plain. Stream + logs = task_log_reader.read_log_stream(ti, task_try_number, metadata) + + return Response( + logs, + headers={"Content-Type": return_type} + ) + + except AttributeError as err: + error_message = [ + f"Task log handler {task_log_reader} does not support read logs.\n{str(err)}\n" + ] + metadata['end_of_log'] = True + return logs_schema.dump( + LogResponseObject(continuation_token=str(metadata), + content=str(error_message)) + ) Review comment: This situation will never happen. This case was handled earlier. Errors should be returned as a response with an error code. Logs should not be used to send errors to the client. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
