[ 
https://issues.apache.org/jira/browse/AIRFLOW-6214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16994238#comment-16994238
 ] 

Albertus Kelvin commented on AIRFLOW-6214:
------------------------------------------

hi [~dennisli], thanks for your comment. Really appreciate.

Just fyi, I set up the connection via environment variables and provided the 
URI. But I think it should apply to db as well.

I investigated the *Connection* module (airflow.models.connection) further and 
found that if we provide the URI (ex: spark://host:port), then the attributes 
will be derived by parsing the URI.

When parsing the host 
([code|https://github.com/apache/airflow/blob/master/airflow/models/connection.py#L137]),
 the resulting value was only the hostname without the scheme.

Therefore, the *conn.host* in the following code will only contain the hostname.

{code:python}
conn = self.get_connection(self._conn_id)
if conn.port:
    conn_data['master'] = "{}:{}".format(conn.host, conn.port)
else:
    conn_data['master'] = conn.host
{code}

Since *conn* consists of several attributes, including scheme, host, and port, 
I think the *conn_data['master']* should be resolved like:

{code:python}
conn = self.get_connection(self._conn_id)
if conn.port:
    conn_data['master'] = "{}://{}:{}".format(conn.conn_type, conn.host, 
conn.port)
else:
    conn_data['master'] = "{}://{}".format(conn.conn_type, conn.host)
{code}

In addition to your note about the scheme should be put in the *host* (like in 
the unit test), I think it is somewhat not relevant to how the *Connection* 
module works. It also might result in some kinds of exception since the 
*Connection* table has a dedicated column for *scheme* and *host*. Moreover, I 
didn't find any method that parse the scheme from the host.

What do you think?

> Spark driver status tracking for standalone, YARN, Mesos and K8s with cluster 
> deploy mode
> -----------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-6214
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6214
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: hooks, operators
>    Affects Versions: 1.10.6
>            Reporter: Albertus Kelvin
>            Assignee: xifeng
>            Priority: Minor
>
> Based on the following code snippet:
> {code:python}
> def _resolve_should_track_driver_status(self):
>         return ('spark://' in self._connection['master'] and
>                 self._connection['deploy_mode'] == 'cluster')
> {code}
>  
> It seems that the above code will always return *False* because the master 
> address for standalone cluster doesn't contain *spark://* as shown from the 
> below code snippet.
> {code:python}
> conn = self.get_connection(self._conn_id)
> if conn.port:
>     conn_data['master'] = "{}:{}".format(conn.host, conn.port)
> else:
>     conn_data['master'] = conn.host
> {code}
> Additionally, I think this driver status tracker should also be enabled for 
> mesos and kubernetes with cluster mode since the *--status* argument supports 
> all of these cluster managers. Refer to 
> [this|https://github.com/apache/spark/blob/be867e8a9ee8fc5e4831521770f51793e9265550/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala#L543].
> For YARN cluster mode, I think we can use built-in commands from yarn itself, 
> such as *yarn application -status <ApplicationID>*.
> Therefore, the *_build_track_driver_status_command* method should be updated 
> accordingly to accommodate such a need, such as the following.
> {code:python}
> def _build_track_driver_status_command(self):
>         # The driver id so we can poll for its status
>         if not self._driver_id:
>             raise AirflowException(
>                     "Invalid status: attempted to poll driver " +
>                     "status but no driver id is known. Giving up.")
>         if self._connection['master'].startswith("spark://") or 
>            self._connection['master'].startswith("mesos://") or 
>            self._connection['master'].startswith("k8s://"): 
>             # standalone, mesos, kubernetes
>             connection_cmd = self._get_spark_binary_path()
>             connection_cmd += ["--master", self._connection['master']]
>             connection_cmd += ["--status", self._driver_id]
>         else:
>             # yarn
>             connection_cmd = ["yarn application -status"]
>             connection_cmd += [self._driver_id]
>         self.log.debug("Poll driver status cmd: %s", connection_cmd)
>         return connection_cmd
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to