Taragolis commented on code in PR #34225:
URL: https://github.com/apache/airflow/pull/34225#discussion_r1486618553


##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,111 @@ def get_log_uri(
         return None
     log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
     return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless link to Spark 
stdout logs."""
+
+    name = "Spark Driver stdout"
+    key = "emr_serverless_logs"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Pre-signed URL to the Spark stdout log.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: Pre-signed URL to Spark stdout log. Empty string if no Spark 
stdout log is available.
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        # If get_dashboard_for_job_run fails for whatever reason, fail after 1 
attempt
+        # so that the rest of the links load in a reasonable time frame.
+        hook = EmrServerlessHook(
+            aws_conn_id=conf.get("conn_id"), config={"retries": 
{"total_max_attempts": 1}}
+        )
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        o = urlparse(resp["url"])
+        return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl()
+
+
+class EmrServerlessDashboardLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless Dashboard Link."""
+
+    name = "EMR Serverless Dashboard"
+    key = "emr_serverless_dashboard"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Pre-signed URL to the application UI for the EMR Serverless job.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: Pre-signed URL to application UI.
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        # If get_dashboard_for_job_run fails for whatever reason, fail after 1 
attempt
+        # so that the rest of the links load in a reasonable time frame.
+        hook = EmrServerlessHook(
+            aws_conn_id=conf.get("conn_id"), config={"retries": 
{"total_max_attempts": 1}}
+        )
+        # Dashboard cannot be served when job is pending/scheduled,
+        # in which case an empty string still gets returned.
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        return resp["url"]

Review Comment:
   Same here



##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,111 @@ def get_log_uri(
         return None
     log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
     return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+    """Helper class for constructing Amazon EMR Serverless link to Spark 
stdout logs."""
+
+    name = "Spark Driver stdout"
+    key = "emr_serverless_logs"
+
+    def get_link(
+        self,
+        operator: BaseOperator,
+        *,
+        ti_key: TaskInstanceKey,
+    ) -> str:
+        """
+        Pre-signed URL to the Spark stdout log.
+
+        :param operator: airflow operator
+        :param ti_key: TaskInstance ID to return link for
+        :return: Pre-signed URL to Spark stdout log. Empty string if no Spark 
stdout log is available.
+        """
+        conf = XCom.get_value(key=self.key, ti_key=ti_key)
+        if not conf:
+            return ""
+        # If get_dashboard_for_job_run fails for whatever reason, fail after 1 
attempt
+        # so that the rest of the links load in a reasonable time frame.
+        hook = EmrServerlessHook(
+            aws_conn_id=conf.get("conn_id"), config={"retries": 
{"total_max_attempts": 1}}
+        )
+        resp = hook.conn.get_dashboard_for_job_run(
+            applicationId=conf.get("application_id"), 
jobRunId=conf.get("job_run_id")
+        )
+        o = urlparse(resp["url"])
+        return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl()

Review Comment:
   I guess this parts could be moved into the `format_link` method rather than 
redefine entire method.
   `get_link`. Original one suppress any errors (except subclasses of 
BaseException)
   
   
https://github.com/apache/airflow/blob/16ffc87ff42f702221f6cb7e42e08bc208183cf1/airflow/providers/amazon/aws/links/base_aws.py#L64-L65



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to