Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4e80b5f10 -> 3ece6f6dc


[AIRFLOW-2256] SparkOperator: Add Client Standalone mode and retry mechanism

Closes #3163 from milanvdm/milanvdm/improve-spark-
operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3ece6f6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3ece6f6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3ece6f6d

Branch: refs/heads/master
Commit: 3ece6f6dcb6ab14249c493c3b1a16ce1c848c29a
Parents: 4e80b5f
Author: milanvdm <milan...@hotmail.com>
Authored: Mon Apr 9 10:20:44 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Mon Apr 9 10:20:44 2018 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/spark_submit_hook.py | 29 ++++++++++++++++++++-----
 1 file changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ece6f6d/airflow/contrib/hooks/spark_submit_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/spark_submit_hook.py 
b/airflow/contrib/hooks/spark_submit_hook.py
index ae024a9..cc2fee8 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -334,6 +334,9 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
         """
         Processes the log files and extracts useful information out of it.
 
+        If the deploy-mode is 'client', log the output of the submit command 
as those
+        are the output logs of the Spark worker directly.
+
         Remark: If the driver needs to be tracked for its status, the 
log-level of the
         spark deploy needs to be at least INFO 
(log4j.logger.org.apache.spark.deploy=INFO)
 
@@ -353,7 +356,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
 
             # If we run Kubernetes cluster mode, we want to extract the driver 
pod id
             # from the logs so we can kill the application when we stop it 
unexpectedly
-            if self._is_kubernetes:
+            elif self._is_kubernetes:
                 match = re.search('\s*pod name: ((.+?)-([a-z0-9]+)-driver)', 
line)
                 if match:
                     self._kubernetes_driver_pod = match.groups()[0]
@@ -368,13 +371,16 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
             # if we run in standalone cluster mode and we want to track the 
driver status
             # we need to extract the driver id from the logs. This allows us 
to poll for
             # the status using the driver id. Also, we can kill the driver 
when needed.
-            if self._should_track_driver_status and not self._driver_id:
+            elif self._should_track_driver_status and not self._driver_id:
                 match_driver_id = re.search('(driver-[0-9\-]+)', line)
                 if match_driver_id:
                     self._driver_id = match_driver_id.groups()[0]
                     self.log.info("identified spark driver id: {}"
                                   .format(self._driver_id))
 
+            else:
+                self.log.info(line)
+
             self.log.debug("spark submit log: {}".format(line))
 
     def _process_spark_status_log(self, itr):
@@ -413,6 +419,14 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
             ERROR: Unable to run or restart due to an unrecoverable error
             (e.g. missing jar file)
         """
+
+        # When your Spark Standalone cluster is not performing well
+        # due to misconfiguration or heavy loads.
+        # it is possible that the polling request will timeout.
+        # Therefore we use a simple retry mechanism.
+        missed_job_status_reports = 0
+        max_missed_job_status_reports = 10
+
         # Keep polling as long as the driver is processing
         while self._driver_status not in ["FINISHED", "UNKNOWN",
                                           "KILLED", "FAILED", "ERROR"]:
@@ -434,10 +448,13 @@ class SparkSubmitHook(BaseHook, LoggingMixin):
             returncode = status_process.wait()
 
             if returncode:
-                raise AirflowException(
-                    "Failed to poll for the driver status: returncode = {}"
-                    .format(returncode)
-                )
+                if missed_job_status_reports < max_missed_job_status_reports:
+                    missed_job_status_reports = missed_job_status_reports + 1
+                else:
+                    raise AirflowException(
+                        "Failed to poll for the driver status {} times: 
returncode = {}"
+                        .format(max_missed_job_status_reports, returncode)
+                    )
 
     def _build_spark_driver_kill_command(self):
         """

Reply via email to