SAMZA-843 - Slow start of Samza jobs with large number of containers
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/868cff7b Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/868cff7b Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/868cff7b Branch: refs/heads/samza-sql Commit: 868cff7b6591b8dd4b413649ca4a92ca30dfd731 Parents: 0bacbbe Author: Navina <[email protected]> Authored: Wed Jan 6 14:15:26 2016 -0800 Committer: Navina <[email protected]> Committed: Wed Jan 6 14:15:26 2016 -0800 ---------------------------------------------------------------------- .../autoscaling/deployer/ConfigManager.java | 5 +- .../apache/samza/container/SamzaContainer.scala | 13 ++- .../samza/coordinator/JobCoordinator.scala | 91 ++++++++++---------- .../samza/coordinator/server/JobServlet.scala | 13 ++- .../main/scala/org/apache/samza/util/Util.scala | 15 +++- .../samza/container/TestSamzaContainer.scala | 49 ++++++++++- .../samza/coordinator/TestJobCoordinator.scala | 17 +++- 7 files changed, 143 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java ---------------------------------------------------------------------- diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java index 87346bc..e3839ca 100644 --- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java +++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java @@ -68,6 +68,7 @@ public class ConfigManager { private SystemStreamPartitionIterator coordinatorStreamIterator; private static final Logger log = LoggerFactory.getLogger(ConfigManager.class); private final long defaultPollingInterval = 100; + private final int defaultReadJobModelDelayMs = 100; private final long interval; private String coordinatorServerURL = null; private final String jobName; @@ -324,7 +325,7 @@ public class ConfigManager { */ public int getCurrentNumTasks() { int currentNumTasks = 0; - for (ContainerModel containerModel : SamzaContainer.readJobModel(coordinatorServerURL).getContainers().values()) { + for (ContainerModel containerModel : SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values()) { currentNumTasks += containerModel.getTasks().size(); } return currentNumTasks; @@ -337,7 +338,7 @@ public class ConfigManager { * @return current number of containers in the job */ public int getCurrentNumContainers() { - return SamzaContainer.readJobModel(coordinatorServerURL).getContainers().values().size(); + return SamzaContainer.readJobModel(coordinatorServerURL, defaultReadJobModelDelayMs).getContainers().values().size(); } http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index ddce148..e3d0b6c 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -52,8 +52,7 @@ import org.apache.samza.system.chooser.MessageChooserFactory import org.apache.samza.system.chooser.RoundRobinChooserFactory import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskInstanceCollector -import org.apache.samza.util.Logging -import org.apache.samza.util.Util +import org.apache.samza.util.{ExponentialSleepStrategy, Logging, Util} import scala.collection.JavaConversions._ import java.net.{UnknownHostException, InetAddress, URL} import org.apache.samza.job.model.{TaskModel, ContainerModel, JobModel} @@ -62,6 +61,8 @@ import org.apache.samza.config.JobConfig.Config2Job import java.lang.Thread.UncaughtExceptionHandler object SamzaContainer extends Logging { + val DEFAULT_READ_JOBMODEL_DELAY_MS = 100 + def main(args: Array[String]) { safeMain(() => new JmxServer, new SamzaContainerExceptionHandler(() => System.exit(1))) } @@ -102,11 +103,15 @@ object SamzaContainer extends Logging { * assignments, and returns objects to be used for SamzaContainer's * constructor. */ - def readJobModel(url: String) = { + def readJobModel(url: String, initialDelayMs: Int = scala.util.Random.nextInt(DEFAULT_READ_JOBMODEL_DELAY_MS)) = { info("Fetching configuration from: %s" format url) SamzaObjectMapper .getObjectMapper - .readValue(Util.read(new URL(url)), classOf[JobModel]) + .readValue( + Util.read( + url = new URL(url), + retryBackoff = new ExponentialSleepStrategy(initialDelayMs = initialDelayMs)), + classOf[JobModel]) } def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: JmxServer) = { http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index 112ec1c..06a96ad 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -20,6 +20,8 @@ package org.apache.samza.coordinator +import java.util.concurrent.atomic.AtomicReference + import org.apache.samza.config.StorageConfig import org.apache.samza.job.model.{JobModel, TaskModel} import org.apache.samza.config.Config @@ -55,6 +57,7 @@ object JobCoordinator extends Logging { * a volatile value to store the current instantiated <code>JobCoordinator</code> */ @volatile var currentJobCoordinator: JobCoordinator = null + val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]() /** * @param coordinatorSystemConfig A config object that contains job.name, @@ -105,10 +108,12 @@ object JobCoordinator extends Logging { changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, streamMetadataCache: StreamMetadataCache) = { - val jobModelGenerator = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache) + val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache) + jobModelRef.set(jobModel) + val server = new HttpServer - server.addServlet("/*", new JobServlet(jobModelGenerator)) - currentJobCoordinator = new JobCoordinator(jobModelGenerator(), server) + server.addServlet("/*", new JobServlet(jobModelRef)) + currentJobCoordinator = new JobCoordinator(jobModel, server) currentJobCoordinator } @@ -141,15 +146,13 @@ object JobCoordinator extends Logging { } /** - * The method intializes the jobModel and creates a JobModel generator which can be used to generate new JobModels - * which catchup with the latest content from the coordinator stream. + * The method intializes the jobModel and returns it to the caller. + * Note: refreshJobModel can be used as a lambda for JobModel generation in the future. */ private def initializeJobModel(config: Config, changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, - streamMetadataCache: StreamMetadataCache): () => JobModel = { - - + streamMetadataCache: StreamMetadataCache): JobModel = { // Do grouping to fetch TaskName to SSP mapping val allSystemStreamPartitions = getInputStreamPartitions(config, streamMetadataCache) val grouper = getSystemStreamPartitionGrouper(config) @@ -195,56 +198,54 @@ object JobCoordinator extends Logging { info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping) changelogManager.writeChangeLogPartitionMapping(newChangelogMapping) } - // Return a jobModelGenerator lambda that can be used to refresh the job model - jobModelGenerator + + jobModel } /** * Build a full Samza job model. The function reads the latest checkpoint from the underlying coordinator stream and * builds a new JobModel. - * This method needs to be thread safe, the reason being, for every HTTP request from a container, this method is called - * and underlying it uses the same instance of coordinator stream producer and coordinator stream consumer. + * Note: This method no longer needs to be thread safe because HTTP request from a container no longer triggers a jobmodel + * refresh. Hence, there is no need for synchronization as before. */ private def refreshJobModel(config: Config, allSystemStreamPartitions: util.Set[SystemStreamPartition], groups: util.Map[TaskName, util.Set[SystemStreamPartition]], previousChangelogMapping: util.Map[TaskName, Integer], localityManager: LocalityManager): JobModel = { - this.synchronized + + // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change + // mapping. + var maxChangelogPartitionId = previousChangelogMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1) + + // Assign all SystemStreamPartitions to TaskNames. + val taskModels = { - // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change - // mapping. - var maxChangelogPartitionId = previousChangelogMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1) - - // Assign all SystemStreamPartitions to TaskNames. - val taskModels = - { - groups.map - { case (taskName, systemStreamPartitions) => - val changelogPartition = Option(previousChangelogMapping.get(taskName)) match - { - case Some(changelogPartitionId) => new Partition(changelogPartitionId) - case _ => - // If we've never seen this TaskName before, then assign it a - // new changelog. - maxChangelogPartitionId += 1 - info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId)) - new Partition(maxChangelogPartitionId) - } - new TaskModel(taskName, systemStreamPartitions, changelogPartition) - }.toSet - } - - // Here is where we should put in a pluggable option for the - // SSPTaskNameGrouper for locality, load-balancing, etc. - - val containerGrouperFactory = Util.getObj[TaskNameGrouperFactory](config.getTaskNameGrouperFactory) - val containerGrouper = containerGrouperFactory.build(config) - val containerModels = asScalaSet(containerGrouper.group(setAsJavaSet(taskModels))).map - { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap - - new JobModel(config, containerModels, localityManager) + groups.map + { case (taskName, systemStreamPartitions) => + val changelogPartition = Option(previousChangelogMapping.get(taskName)) match + { + case Some(changelogPartitionId) => new Partition(changelogPartitionId) + case _ => + // If we've never seen this TaskName before, then assign it a + // new changelog. + maxChangelogPartitionId += 1 + info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId)) + new Partition(maxChangelogPartitionId) + } + new TaskModel(taskName, systemStreamPartitions, changelogPartition) + }.toSet } + + // Here is where we should put in a pluggable option for the + // SSPTaskNameGrouper for locality, load-balancing, etc. + + val containerGrouperFactory = Util.getObj[TaskNameGrouperFactory](config.getTaskNameGrouperFactory) + val containerGrouper = containerGrouperFactory.build(config) + val containerModels = asScalaSet(containerGrouper.group(setAsJavaSet(taskModels))).map + { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap + + new JobModel(config, containerModels, localityManager) } private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int, streamMetadataCache: StreamMetadataCache) { http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala index a3baddb..5750c2d 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala @@ -20,12 +20,21 @@ package org.apache.samza.coordinator.server +import java.util.concurrent.atomic.AtomicReference + +import org.apache.samza.SamzaException import org.apache.samza.job.model.JobModel import org.apache.samza.util.Logging /** * A servlet that dumps the job model for a Samza job. */ -class JobServlet(jobModelGenerator: () => JobModel) extends ServletBase with Logging { - protected def getObjectToWrite() = jobModelGenerator() +class JobServlet(jobModelRef: AtomicReference[JobModel]) extends ServletBase with Logging { + protected def getObjectToWrite() = { + val jobModel = jobModelRef.get() + if (jobModel == null) { // This should never happen because JobServlet is instantiated only after a jobModel is generated and its reference is updated + throw new SamzaException("Job Model is not defined in the JobCoordinator. This indicates that the Samza job is unstable. Exiting...") + } + jobModel + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 58fbb8f..bd0fe5f 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -119,15 +119,22 @@ object Util extends Logging { } /** + * Overriding read method defined below so that it can be accessed from Java classes with default values + */ + def read(url: URL, timeout: Int): String = { + read(url, timeout, new ExponentialSleepStrategy) + } + + /** * Reads a URL and returns its body as a string. Does no error handling. * * @param url HTTP URL to read from. * @param timeout How long to wait before timing out when connecting to or reading from the HTTP server. + * @param retryBackoff Instance of exponentialSleepStrategy that encapsulates info on how long to sleep and retry operation * @return String payload of the body of the HTTP response. */ - def read(url: URL, timeout: Int = 60000): String = { + def read(url: URL, timeout: Int = 60000, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy): String = { var httpConn = getHttpConnection(url, timeout) - val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy retryBackoff.run(loop => { if(httpConn.getResponseCode != 200) { @@ -143,6 +150,10 @@ object Util extends Logging { }, (exception, loop) => { exception match { + case ioe: IOException => { + warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass) + httpConn = getHttpConnection(url, timeout) + } case e: Exception => loop.done error("Unable to connect to Job coordinator server, received exception", e) http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 365ff0a..9df12d2 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -19,7 +19,9 @@ package org.apache.samza.container +import java.net.SocketTimeoutException import java.util +import java.util.concurrent.atomic.AtomicReference import org.apache.samza.storage.TaskStorageManager import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -30,8 +32,7 @@ import org.apache.samza.Partition import org.apache.samza.config.Config import org.apache.samza.config.MapConfig import org.apache.samza.coordinator.JobCoordinator -import org.apache.samza.coordinator.server.HttpServer -import org.apache.samza.coordinator.server.JobServlet +import org.apache.samza.coordinator.server.{ServletBase, HttpServer, JobServlet} import org.apache.samza.job.model.ContainerModel import org.apache.samza.job.model.JobModel import org.apache.samza.job.model.TaskModel @@ -76,7 +77,8 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { def jobModelGenerator(): JobModel = jobModel val server = new HttpServer val coordinator = new JobCoordinator(jobModel, server) - coordinator.server.addServlet("/*", new JobServlet(jobModelGenerator)) + JobCoordinator.jobModelRef.set(jobModelGenerator()) + coordinator.server.addServlet("/*", new JobServlet(JobCoordinator.jobModelRef)) try { coordinator.start assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString)) @@ -86,6 +88,33 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } @Test + def testReadJobModelWithTimeouts { + val config = new MapConfig(Map("a" -> "b")) + val offsets = new util.HashMap[SystemStreamPartition, String]() + offsets.put(new SystemStreamPartition("system","stream", new Partition(0)), "1") + val tasks = Map( + new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)), + new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0))) + val containers = Map( + Integer.valueOf(0) -> new ContainerModel(0, tasks), + Integer.valueOf(1) -> new ContainerModel(1, tasks)) + val jobModel = new JobModel(config, containers) + def jobModelGenerator(): JobModel = jobModel + val server = new HttpServer + val coordinator = new JobCoordinator(jobModel, server) + JobCoordinator.jobModelRef.set(jobModelGenerator()) + val mockJobServlet = new MockJobServlet(2, JobCoordinator.jobModelRef) + coordinator.server.addServlet("/*", mockJobServlet) + try { + coordinator.start + assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString)) + } finally { + coordinator.stop + } + assertEquals(2, mockJobServlet.exceptionCount) + } + + @Test def testChangelogPartitions { val config = new MapConfig(Map("a" -> "b")) val offsets = new util.HashMap[SystemStreamPartition, String]() @@ -272,3 +301,17 @@ class MockCheckpointManager extends CheckpointManager { override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint): Unit = { } } + +class MockJobServlet(exceptionLimit: Int, jobModelRef: AtomicReference[JobModel]) extends JobServlet(jobModelRef) { + var exceptionCount = 0 + + override protected def getObjectToWrite() = { + if (exceptionCount < exceptionLimit) { + exceptionCount += 1 + throw new java.io.IOException("Throwing exception") + } else { + val jobModel = jobModelRef.get() + jobModel + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/868cff7b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index 80cccf3..9ab1dd5 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -19,6 +19,8 @@ package org.apache.samza.coordinator +import org.apache.samza.serializers.model.SamzaObjectMapper +import org.apache.samza.util.Util import org.junit.{After, Test} import org.junit.Assert._ import scala.collection.JavaConversions._ @@ -98,10 +100,21 @@ class TestJobCoordinator { MockCoordinatorStreamSystemFactory.enableMockConsumerCache() val coordinator = JobCoordinator(new MapConfig(config ++ otherConfigs)) + val expectedJobModel = new JobModel(new MapConfig(config), containers) + + // Verify that the atomicReference is initialized + assertNotNull(JobCoordinator.jobModelRef.get()) + assertEquals(expectedJobModel, JobCoordinator.jobModelRef.get()) + coordinator.start - val jobModel = new JobModel(new MapConfig(config), containers) assertEquals(new MapConfig(config), coordinator.jobModel.getConfig) - assertEquals(jobModel, coordinator.jobModel) + assertEquals(expectedJobModel, coordinator.jobModel) + + // Verify that the JobServlet is serving the correct jobModel + val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(Util.read(coordinator.server.getUrl), classOf[JobModel]) + assertEquals(expectedJobModel, jobModelFromCoordinatorUrl) + + coordinator.stop } @Test
