mik-laj commented on a change in pull request #9331: URL: https://github.com/apache/airflow/pull/9331#discussion_r441368237
########## File path: airflow/api_connexion/endpoints/log_endpoint.py ########## @@ -14,13 +14,99 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import logging +import os +from io import BytesIO -# TODO(mik-laj): We have to implement it. -# Do you want to help? Please look at: https://github.com/apache/airflow/issues/8135 +from flask import Response, current_app, request +from itsdangerous.exc import BadSignature +from itsdangerous.url_safe import URLSafeSerializer +from airflow import models, settings +from airflow.api_connexion.exceptions import BadRequest, NotFound +from airflow.configuration import conf +from airflow.models import DagRun +from airflow.settings import STORE_SERIALIZED_DAGS +from airflow.utils.helpers import render_log_filename +from airflow.utils.session import provide_session -def get_log(): +if os.environ.get('SKIP_DAGS_PARSING') != 'True': + dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS) +else: + dagbag = models.DagBag(os.devnull, include_examples=False) + + +@provide_session +def get_log(session, dag_id, dag_run_id, task_id, task_try_number, + full_content=False, token=None): """ Get logs for specific task instance """ - raise NotImplementedError("Not implemented yet.") + if not token: + metadata = {} + else: + key = current_app.config["SECRET_KEY"] + try: + metadata = URLSafeSerializer(key).loads(token) + except BadSignature: + raise BadRequest("Bad Signature. Please sign your token with URLSafeSerializer") + + if metadata.get('download_logs', None) and metadata['download_logs']: + full_content = True + + if full_content: + metadata['download_logs'] = True + else: + metadata['download_logs'] = False + query = session.query(DagRun).filter(DagRun.dag_id == dag_id) + dag_run = query.filter(DagRun.run_id == dag_run_id).first() + if not dag_run: + raise NotFound("Specified DagRun not found") + + logger = logging.getLogger('airflow.task') + task_log_reader = conf.get('logging', 'task_log_reader') + handler = next((handler for handler in logger.handlers + if handler.name == task_log_reader), None) + + ti = dag_run.get_task_instance(task_id, session) + try: + if ti is None: + logs = ["*** Task instance did not exist in the DB\n"] + metadata['end_of_log'] = True + else: + dag = dagbag.get_dag(dag_id) + ti.task = dag.get_task(ti.task_id) + logs, metadatas = handler.read(ti, task_try_number, metadata=metadata) + metadata = metadatas[0] + + if full_content: Review comment: ```suggestion if full_content or request.headers.get('Content-Type', None) == 'text/plain': ``` We return the full journal for the text. In this format, we don't have an easy way to return a token. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
