This is an automated email from the ASF dual-hosted git repository.

jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new da6c28b  SAMZA-2165: Account for coordinator restarts in calls to 
status
da6c28b is described below

commit da6c28be82c04a00cafff3360922cd89435e6222
Author: Jagadish <jvenkatra...@linkedin.com>
AuthorDate: Mon Apr 15 17:30:34 2019 -0700

    SAMZA-2165: Account for coordinator restarts in calls to status
    
    Currently status of a Samza job is determined by a combination of:
    1. Obtain YARN's status for the job by querying the RM
    2. Obtain the AM/coordinator URL for the job
    3. If (1) is "Running", Query the coordinator URL if all containers have 
started
    
    YARN may restart the coordinator between (2) and (3) and the old 
coordinator process may no longer be alive, triggering a ConnectException in 
(3). This causes the status-call to fail;
    
    A better alternative to handle these retriable errors is to return a "New" 
status from the API - so that applications can continue polling for status.
    
    Author: Jagadish <jvenkatra...@linkedin.com>
    
    Reviewers: Prateek M<pmahe...@linkedin.com>
    
    Closes #995 from vjagadish1989/yarnjob-fix
---
 .../scala/org/apache/samza/job/yarn/ClientHelper.scala |  6 ++++++
 .../org/apache/samza/job/yarn/TestClientHelper.scala   | 18 ++++++++++++++++++
 2 files changed, 24 insertions(+)

diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index 6f65ec4..c937523 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -350,6 +350,12 @@ class ClientHelper(conf: Configuration) extends Logging {
       } else {
         false
       }
+    } catch {
+      case e: IOException => {
+        // ignore any exceptions when querying the AM - likely due to YARN 
restarting the AM process
+        warn("Exception when querying AM metrics", e)
+        false
+      }
     } finally {
       amClient.close()
     }
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala
index 6df6589..f85bb56 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala
@@ -18,6 +18,8 @@
  */
 package org.apache.samza.job.yarn
 
+import java.net.ConnectException
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.permission.FsPermission
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@@ -48,6 +50,22 @@ class TestClientHelper extends FunSuite {
     }
   }
 
+  test("test allContainersRunning is false when there are ConnectExceptions") {
+    val exceptionThrowingClientHelper = new ClientHelper(hadoopConfig) {
+      override def createYarnClient() = {
+        mock[YarnClient]
+      }
+      override def createAmClient(applicationReport: ApplicationReport) = {
+        val amClient = mock[ApplicationMasterRestClient]
+        when(amClient.getMetrics).thenThrow(new ConnectException())
+        amClient
+      }
+    }
+
+    val allContainersRunning = 
exceptionThrowingClientHelper.allContainersRunning(null)
+    assertEquals(false, allContainersRunning)
+  }
+
   test("test validateJobConfig") {
     import collection.JavaConverters._
     var config = new MapConfig()

Reply via email to