ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r442125343



##########
File path: airflow/api_connexion/endpoints/log_endpoint.py
##########
@@ -15,12 +15,102 @@
 # specific language governing permissions and limitations
 # under the License.
 
-# TODO(mik-laj): We have to implement it.
-#     Do you want to help? Please look at: 
https://github.com/apache/airflow/issues/8135
+import os
 
+from flask import Response, current_app, request
+from itsdangerous.exc import BadSignature
+from itsdangerous.url_safe import URLSafeSerializer
 
-def get_log():
+from airflow import models, settings
+from airflow.api_connexion.exceptions import BadRequest, NotFound
+from airflow.api_connexion.schemas.log_schema import logs_schema
+from airflow.models import DagRun
+from airflow.settings import STORE_SERIALIZED_DAGS
+from airflow.utils.log.log_reader import TaskLogReader
+from airflow.utils.session import provide_session
+
+if os.environ.get('SKIP_DAGS_PARSING') != 'True':
+    dagbag = models.DagBag(settings.DAGS_FOLDER, 
store_serialized_dags=STORE_SERIALIZED_DAGS)
+else:
+    dagbag = models.DagBag(os.devnull, include_examples=False)
+
+
+@provide_session
+def get_log(session, dag_id, dag_run_id, task_id, task_try_number,
+            full_content=False, token=None):
     """
     Get logs for specific task instance
     """
-    raise NotImplementedError("Not implemented yet.")
+    if not token:
+        metadata = {}
+    else:
+        key = current_app.config["SECRET_KEY"]
+        try:
+            metadata = URLSafeSerializer(key).loads(token)
+        except BadSignature:
+            raise BadRequest("Bad Signature. Please sign your token with 
URLSafeSerializer")
+
+    if metadata.get('download_logs', None) and metadata['download_logs']:
+        full_content = True
+
+    if full_content:
+        metadata['download_logs'] = True
+    else:
+        metadata['download_logs'] = False
+
+    task_log_reader = TaskLogReader()
+
+    if not task_log_reader.is_supported:
+        metadata = {"end_of_log": True}
+        return logs_schema.dump(dict(
+            content="[Task log handler does not support read logs.]",
+            continuation_token=str(metadata)
+        ))
+
+    query = session.query(DagRun).filter(DagRun.dag_id == dag_id)
+    dag_run = query.filter(DagRun.run_id == dag_run_id).first()
+    if not dag_run:
+        raise NotFound("Specified DagRun not found")
+
+    ti = dag_run.get_task_instance(task_id, session)
+    if ti is None:
+        metadata['end_of_log'] = True
+        return logs_schema.dump(
+            dict(
+                content=str("[*** Task instance did not exist in the DB\n]"),
+                continuation_token=str(metadata))
+        )
+    try:
+        dag = dagbag.get_dag(dag_id)
+        if dag:
+            ti.task = dag.get_task(ti.task_id)
+
+        if request.headers.get('Content-Type', None) == 'application/json':

Review comment:
       ```suggestion
           if not full_content or request.headers.get('Content-Type', None) == 
'application/json':
   ```
   Send json if content requested is not full content. Disregard text/plain 
content type if metadata['download_logs'] is false. This way we conform to the 
spec. Any thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to