This is an automated email from the ASF dual-hosted git repository. jagadish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new da6c28b SAMZA-2165: Account for coordinator restarts in calls to status da6c28b is described below commit da6c28be82c04a00cafff3360922cd89435e6222 Author: Jagadish <jvenkatra...@linkedin.com> AuthorDate: Mon Apr 15 17:30:34 2019 -0700 SAMZA-2165: Account for coordinator restarts in calls to status Currently status of a Samza job is determined by a combination of: 1. Obtain YARN's status for the job by querying the RM 2. Obtain the AM/coordinator URL for the job 3. If (1) is "Running", Query the coordinator URL if all containers have started YARN may restart the coordinator between (2) and (3) and the old coordinator process may no longer be alive, triggering a ConnectException in (3). This causes the status-call to fail; A better alternative to handle these retriable errors is to return a "New" status from the API - so that applications can continue polling for status. Author: Jagadish <jvenkatra...@linkedin.com> Reviewers: Prateek M<pmahe...@linkedin.com> Closes #995 from vjagadish1989/yarnjob-fix --- .../scala/org/apache/samza/job/yarn/ClientHelper.scala | 6 ++++++ .../org/apache/samza/job/yarn/TestClientHelper.scala | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala index 6f65ec4..c937523 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala @@ -350,6 +350,12 @@ class ClientHelper(conf: Configuration) extends Logging { } else { false } + } catch { + case e: IOException => { + // ignore any exceptions when querying the AM - likely due to YARN restarting the AM process + warn("Exception when querying AM metrics", e) + false + } } finally { amClient.close() } diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala index 6df6589..f85bb56 100644 --- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala +++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala @@ -18,6 +18,8 @@ */ package org.apache.samza.job.yarn +import java.net.ConnectException + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} @@ -48,6 +50,22 @@ class TestClientHelper extends FunSuite { } } + test("test allContainersRunning is false when there are ConnectExceptions") { + val exceptionThrowingClientHelper = new ClientHelper(hadoopConfig) { + override def createYarnClient() = { + mock[YarnClient] + } + override def createAmClient(applicationReport: ApplicationReport) = { + val amClient = mock[ApplicationMasterRestClient] + when(amClient.getMetrics).thenThrow(new ConnectException()) + amClient + } + } + + val allContainersRunning = exceptionThrowingClientHelper.allContainersRunning(null) + assertEquals(false, allContainersRunning) + } + test("test validateJobConfig") { import collection.JavaConverters._ var config = new MapConfig()