Taragolis commented on code in PR #34225:
URL: https://github.com/apache/airflow/pull/34225#discussion_r1486618553
##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,111 @@ def get_log_uri(
return None
log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+ """Helper class for constructing Amazon EMR Serverless link to Spark
stdout logs."""
+
+ name = "Spark Driver stdout"
+ key = "emr_serverless_logs"
+
+ def get_link(
+ self,
+ operator: BaseOperator,
+ *,
+ ti_key: TaskInstanceKey,
+ ) -> str:
+ """
+ Pre-signed URL to the Spark stdout log.
+
+ :param operator: airflow operator
+ :param ti_key: TaskInstance ID to return link for
+ :return: Pre-signed URL to Spark stdout log. Empty string if no Spark
stdout log is available.
+ """
+ conf = XCom.get_value(key=self.key, ti_key=ti_key)
+ if not conf:
+ return ""
+ # If get_dashboard_for_job_run fails for whatever reason, fail after 1
attempt
+ # so that the rest of the links load in a reasonable time frame.
+ hook = EmrServerlessHook(
+ aws_conn_id=conf.get("conn_id"), config={"retries":
{"total_max_attempts": 1}}
+ )
+ resp = hook.conn.get_dashboard_for_job_run(
+ applicationId=conf.get("application_id"),
jobRunId=conf.get("job_run_id")
+ )
+ o = urlparse(resp["url"])
+ return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl()
+
+
+class EmrServerlessDashboardLink(BaseAwsLink):
+ """Helper class for constructing Amazon EMR Serverless Dashboard Link."""
+
+ name = "EMR Serverless Dashboard"
+ key = "emr_serverless_dashboard"
+
+ def get_link(
+ self,
+ operator: BaseOperator,
+ *,
+ ti_key: TaskInstanceKey,
+ ) -> str:
+ """
+ Pre-signed URL to the application UI for the EMR Serverless job.
+
+ :param operator: airflow operator
+ :param ti_key: TaskInstance ID to return link for
+ :return: Pre-signed URL to application UI.
+ """
+ conf = XCom.get_value(key=self.key, ti_key=ti_key)
+ if not conf:
+ return ""
+ # If get_dashboard_for_job_run fails for whatever reason, fail after 1
attempt
+ # so that the rest of the links load in a reasonable time frame.
+ hook = EmrServerlessHook(
+ aws_conn_id=conf.get("conn_id"), config={"retries":
{"total_max_attempts": 1}}
+ )
+ # Dashboard cannot be served when job is pending/scheduled,
+ # in which case an empty string still gets returned.
+ resp = hook.conn.get_dashboard_for_job_run(
+ applicationId=conf.get("application_id"),
jobRunId=conf.get("job_run_id")
+ )
+ return resp["url"]
Review Comment:
Same here
##########
airflow/providers/amazon/aws/links/emr.py:
##########
@@ -66,3 +82,111 @@ def get_log_uri(
return None
log_uri = S3Hook.parse_s3_url(cluster_info["LogUri"])
return "/".join(log_uri)
+
+
+class EmrServerlessLogsLink(BaseAwsLink):
+ """Helper class for constructing Amazon EMR Serverless link to Spark
stdout logs."""
+
+ name = "Spark Driver stdout"
+ key = "emr_serverless_logs"
+
+ def get_link(
+ self,
+ operator: BaseOperator,
+ *,
+ ti_key: TaskInstanceKey,
+ ) -> str:
+ """
+ Pre-signed URL to the Spark stdout log.
+
+ :param operator: airflow operator
+ :param ti_key: TaskInstance ID to return link for
+ :return: Pre-signed URL to Spark stdout log. Empty string if no Spark
stdout log is available.
+ """
+ conf = XCom.get_value(key=self.key, ti_key=ti_key)
+ if not conf:
+ return ""
+ # If get_dashboard_for_job_run fails for whatever reason, fail after 1
attempt
+ # so that the rest of the links load in a reasonable time frame.
+ hook = EmrServerlessHook(
+ aws_conn_id=conf.get("conn_id"), config={"retries":
{"total_max_attempts": 1}}
+ )
+ resp = hook.conn.get_dashboard_for_job_run(
+ applicationId=conf.get("application_id"),
jobRunId=conf.get("job_run_id")
+ )
+ o = urlparse(resp["url"])
+ return o._replace(path="/logs/SPARK_DRIVER/stdout.gz").geturl()
Review Comment:
I guess this parts could be moved into the `format_link` method rather than
redefine entire method.
`get_link`. Original one suppress any errors (except subclasses of
BaseException)
https://github.com/apache/airflow/blob/16ffc87ff42f702221f6cb7e42e08bc208183cf1/airflow/providers/amazon/aws/links/base_aws.py#L64-L65
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]