Repository: samza
Updated Branches:
refs/heads/master 9d904b157 -> d493f9a46
SAMZA-1261: Fix TestProcessJob flaky test
- Fix flaky test, `TestProcessJob` `testProcessJobKillShouldWork`, which was
failing intermittently due to a race condition. In particular, the thread
running the test could assert `jobModelManager.stopped` before another thread,
enclosed within `ProcessJob.submit`, could actually invoke
`jobModelManager.stop`.
+ Refactor `ProcessJob` to improve its overall robustness
+ Handle corner cases, e.g.
+ Fail gracefully if starting process within `ProcessJob.submit` throws
+ Ignore attempts to kill a job before it is submitted
+ Ensure job status is always set appropriately
+ Remove unnecessary stdout/stderr piping code
+ Employ `wait`/`notify` instead of `Thread.sleep`
+ Eliminate all artificial wait method invocations intended to influence
inter-thread execution order in unit tests
+ Add more unit tests
Author: Ahmed Abdul Hamid <[email protected]>
Reviewers: Boris S<[email protected]>, Shanthoosh V<[email protected]>
Closes #485 from ahmedahamid/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d493f9a4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d493f9a4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d493f9a4
Branch: refs/heads/master
Commit: d493f9a46e92dc528cac718310dd4623e7d9cd44
Parents: 9d904b1
Author: Ahmed Abdul Hamid <[email protected]>
Authored: Mon May 7 13:45:53 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Mon May 7 13:45:53 2018 -0700
----------------------------------------------------------------------
.../org/apache/samza/job/local/ProcessJob.scala | 167 ++++++++++---------
.../apache/samza/job/local/TestProcessJob.scala | 133 ++++++++++++---
2 files changed, 198 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/d493f9a4/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
----------------------------------------------------------------------
diff --git
a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
index bc2d74b..f719220 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
@@ -19,113 +19,130 @@
package org.apache.samza.job.local
-import java.io.{InputStream, OutputStream}
import java.util.concurrent.CountDownLatch
-import org.apache.samza.SamzaException
import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.job.ApplicationStatus.{New, Running,
UnsuccessfulFinish}
+import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish,
UnsuccessfulFinish}
import org.apache.samza.job.{ApplicationStatus, CommandBuilder, StreamJob}
import org.apache.samza.util.Logging
import scala.collection.JavaConverters._
-class ProcessJob(commandBuilder: CommandBuilder, jobCoordinator:
JobModelManager) extends StreamJob with Logging {
- var jobStatus: Option[ApplicationStatus] = None
- var process: Process = null
-
- def submit: StreamJob = {
- jobStatus = Some(New)
- val waitForThreadStart = new CountDownLatch(1)
+object ProcessJob {
+ private def createProcessBuilder(commandBuilder: CommandBuilder):
ProcessBuilder = {
val processBuilder = new
ProcessBuilder(commandBuilder.buildCommand.split(" ").toList.asJava)
+ processBuilder.environment.putAll(commandBuilder.buildEnvironment)
+
+ // Pipe all output to this process's streams.
+ processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT)
+ processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT)
processBuilder
- .environment
- .putAll(commandBuilder.buildEnvironment)
+ }
+}
+
+class ProcessJob(commandBuilder: CommandBuilder, val jobModelManager:
JobModelManager) extends StreamJob with Logging {
+
+ import ProcessJob._
+
+ val lock = new Object
+ val processBuilder: ProcessBuilder = createProcessBuilder(commandBuilder)
+ var jobStatus: ApplicationStatus = New
+ var processThread: Option[Thread] = None
+
+
+ def submit: StreamJob = {
+ val threadStartCountDownLatch = new CountDownLatch(1)
- // create a non-daemon thread to make job runner block until the job
finishes.
- // without this, the proc dies when job runner ends.
- val procThread = new Thread {
+ // Create a non-daemon thread to make job runner block until the job
finishes.
+ // Without this, the proc dies when job runner ends.
+ processThread = Some(new Thread {
override def run {
- process = processBuilder.start
-
- // pipe all output to this process's streams
- val outThread = new Thread(new Piper(process.getInputStream,
System.out))
- val errThread = new Thread(new Piper(process.getErrorStream,
System.err))
- outThread.setDaemon(true)
- errThread.setDaemon(true)
- outThread.start
- errThread.start
- waitForThreadStart.countDown
- process.waitFor
- jobCoordinator.stop
+ var processExitCode = -1
+ var process: Option[Process] = None
+
+ setStatus(Running)
+
+ try {
+ threadStartCountDownLatch.countDown
+ process = Some(processBuilder.start)
+ processExitCode = process.get.waitFor
+ } catch {
+ case _: InterruptedException => process foreach { p =>
p.destroyForcibly }
+ case e: Exception => error("Encountered an error during job start:
%s".format(e.getMessage))
+ } finally {
+ jobModelManager.stop
+ setStatus(if (processExitCode == 0) SuccessfulFinish else
UnsuccessfulFinish)
+ }
}
- }
+ })
+
+ info("Starting process job")
- procThread.start
- waitForThreadStart.await
- jobStatus = Some(Running)
+ processThread.get.start
+ threadStartCountDownLatch.await
ProcessJob.this
}
def kill: StreamJob = {
- process.destroyForcibly
- jobStatus = Some(UnsuccessfulFinish)
- ProcessJob.this
- }
+ getStatus match {
+ case Running => {
+ info("Attempting to kill running process job")
- def waitForFinish(timeoutMs: Long) = {
- val thread = new Thread {
- setDaemon(true)
- override def run {
- try {
- process.waitFor
- } catch {
- case e: InterruptedException => info("Got interrupt.", e)
+ processThread foreach { thread =>
+ thread.interrupt
+ thread.join
+
+ info("Process job killed successfully")
}
}
+ case status => warn("Ignoring attempt to kill a process job that is not
running. Job status is %s".format(status))
}
- thread.start
- thread.join(timeoutMs)
- thread.interrupt
- jobStatus.getOrElse(null)
+ ProcessJob.this
}
- def waitForStatus(status: ApplicationStatus, timeoutMs: Long) = {
- val start = System.currentTimeMillis
+ def waitForFinish(timeoutMs: Long): ApplicationStatus = {
+ require(timeoutMs >= 0, "Timeout values must be non-negative")
- while (System.currentTimeMillis - start < timeoutMs && status !=
jobStatus) {
- Thread.sleep(500)
- }
-
- jobStatus.getOrElse(null)
+ processThread foreach { thread => thread.join(timeoutMs) }
+ getStatus
}
- def getStatus = jobStatus.getOrElse(null)
-}
+ def waitForStatus(status: ApplicationStatus, timeoutMs: Long):
ApplicationStatus = lock.synchronized {
+ require(timeoutMs >= 0, "Timeout values must be non-negative")
-/**
- * Silly class to forward bytes from one stream to another. Using this to pipe
- * output from subprocess to this process' stdout/stderr.
- */
-class Piper(in: InputStream, out: OutputStream) extends Runnable {
- def run() {
- try {
- val b = new Array[Byte](512)
- var read = 1;
- while (read > -1) {
- read = in.read(b, 0, b.length)
- if (read > -1) {
- out.write(b, 0, read)
- out.flush()
+ timeoutMs match {
+ case 0 => {
+ info("Waiting for application status %s indefinitely".format(status))
+
+ while (getStatus != status) lock.wait(0)
+ }
+ case _ => {
+ info("Waiting for application status %s for %d ms".format(status,
timeoutMs))
+
+ val startTimeMs = System.currentTimeMillis
+ var remainingTimeoutMs = timeoutMs
+
+ while (getStatus != status && remainingTimeoutMs > 0) {
+ lock.wait(remainingTimeoutMs)
+
+ val elapsedWaitTimeMs = System.currentTimeMillis - startTimeMs
+ remainingTimeoutMs = timeoutMs - elapsedWaitTimeMs
}
}
- } catch {
- case e: Exception => throw new SamzaException("Broken pipe", e);
- } finally {
- in.close()
- out.close()
}
+ getStatus
+ }
+
+ def getStatus: ApplicationStatus = lock.synchronized {
+ jobStatus
+ }
+
+ private def setStatus(status: ApplicationStatus): Unit = lock.synchronized {
+ info("Changing process job status from %s to %s".format(jobStatus, status))
+
+ jobStatus = status
+ lock.notify
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d493f9a4/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
----------------------------------------------------------------------
diff --git
a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
index 58ecf99..dc87583 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -20,49 +20,128 @@
package org.apache.samza.job.local
import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish,
UnsuccessfulFinish}
+import org.apache.samza.job.CommandBuilder
import org.junit.Assert._
import org.junit.Test
-import org.apache.samza.job.ApplicationStatus
-import org.apache.samza.job.CommandBuilder
+
import scala.collection.JavaConverters._
-class TestProcessJob {
- @Test
- def testProcessJobShouldFinishOnItsOwn {
+object TestProcessJob {
+
+ val OneSecondCommand = "sleep 1"
+ val TenSecondCommand = "sleep 10"
+ val SimpleCommand = "true"
+ val FailingCommand = "false"
+ val BadCommand = "bad-non-existing-command"
+
+ private def createProcessJob(command: String): ProcessJob = {
val commandBuilder = new CommandBuilder {
- override def buildCommand = "sleep 1"
+ override def buildCommand = command
+
override def buildEnvironment = Map[String, String]().asJava
}
- val coordinator = new MockJobModelManager()
- val job = new ProcessJob(commandBuilder, coordinator)
- job.submit
- job.waitForFinish(999999)
+ new ProcessJob(commandBuilder, new MockJobModelManager)
}
- // TODO: fix in SAMZA-1261
- // @Test
- def testProcessJobKillShouldWork {
- val commandBuilder = new CommandBuilder {
- override def buildCommand = "sleep 999999999"
- override def buildEnvironment = Map[String, String]().asJava
- }
- val coordinator = new MockJobModelManager()
- val job = new ProcessJob(commandBuilder, coordinator)
- job.submit
- job.waitForFinish(500)
- job.kill
- job.waitForFinish(999999)
- assertTrue(coordinator.stopped)
- assertEquals(ApplicationStatus.UnsuccessfulFinish,
job.waitForFinish(999999999))
+ private def getMockJobModelManager(processJob: ProcessJob):
MockJobModelManager = {
+ processJob.jobModelManager.asInstanceOf[MockJobModelManager]
+ }
+}
+
+class TestProcessJob {
+
+ import TestProcessJob._
+
+ @Test
+ def testProcessJobShouldFinishOnItsOwn: Unit = {
+ val processJob = createProcessJob(SimpleCommand)
+
+ val status = processJob.submit.waitForFinish(0)
+
+ assertEquals(SuccessfulFinish, status)
+ assertTrue(getMockJobModelManager(processJob).stopped)
+ }
+
+ @Test
+ def testProcessJobShouldReportFailingCommands: Unit = {
+ val processJob = createProcessJob(FailingCommand)
+
+ val status = processJob.submit.waitForFinish(0)
+
+ assertEquals(UnsuccessfulFinish, status)
+ assertTrue(getMockJobModelManager(processJob).stopped)
+ }
+
+ @Test
+ def testProcessJobWaitForFinishShouldTimeOut: Unit = {
+ val processJob = createProcessJob(OneSecondCommand)
+
+ // Wait for a shorter duration than that necessary for the specified
command to complete.
+ val status = processJob.submit.waitForFinish(10)
+
+ assertEquals(Running, status)
+ }
+
+ @Test
+ def testProcessJobKillShouldWork: Unit = {
+ val processJob = createProcessJob(TenSecondCommand)
+
+ processJob.submit.kill
+
+ assertEquals(UnsuccessfulFinish, processJob.getStatus)
+ assertTrue(getMockJobModelManager(processJob).stopped)
+ }
+
+ @Test
+ def testProcessJobSubmitBadProcessShouldFailGracefully: Unit = {
+ val processJob = createProcessJob(BadCommand)
+
+ processJob.submit.waitForFinish(0)
+
+ assertEquals(UnsuccessfulFinish, processJob.getStatus)
+ assertTrue(getMockJobModelManager(processJob).stopped)
+ }
+
+ @Test
+ def testProcessJobWaitForStatusShouldWork: Unit = {
+ val processJob = createProcessJob(SimpleCommand)
+
+ processJob.submit.waitForStatus(SuccessfulFinish, 0)
+
+ assertEquals(SuccessfulFinish, processJob.getStatus)
+ assertTrue(getMockJobModelManager(processJob).stopped)
+ }
+
+ @Test
+ def testProcessJobWaitForStatusShouldTimeOut: Unit = {
+ val processJob = createProcessJob(OneSecondCommand)
+
+ // Wait for a shorter duration than that necessary for the specified
command to complete.
+ val status = processJob.submit.waitForStatus(SuccessfulFinish, 10)
+
+ assertEquals(Running, status)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testProcessJobWaitForStatusShouldThrowOnNegativeTimeout: Unit = {
+ val processJob = createProcessJob(SimpleCommand)
+ processJob.waitForStatus(SuccessfulFinish, -1)
+ }
+
+ @Test(expected = classOf[IllegalArgumentException])
+ def testProcessJobWaitForFinishShouldThrowOnNegativeTimeout: Unit = {
+ val processJob = createProcessJob(SimpleCommand)
+ processJob.waitForFinish(-1)
}
}
class MockJobModelManager extends JobModelManager(null, null) {
var stopped: Boolean = false
- override def start: Unit = { }
+ override def start: Unit = {}
override def stop: Unit = {
- stopped = true;
+ stopped = true
}
}