http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java new file mode 100644 index 0000000..3d1560f --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.UUID; + +/** + * Specification of a Request for resources from a ClusterResourceManager. A + * resource request currently includes cpu cores and memory in MB. A preferred host + * can also be specified with a request. + * + * When used with a ordered data structures (for example, priority queues) + * ordering between two SamzaResourceRequests is defined by their timestamp. + * + * //TODO: Define a SamzaResourceRequestBuilder API as specified in SAMZA-881 + */ +public class SamzaResourceRequest implements Comparable<SamzaResourceRequest> { + private static final Logger log = LoggerFactory.getLogger(SamzaResourceRequest.class); + + /** + * Specifications of a resource request. + */ + private final int numCores; + + private final int memoryMB; + /** + * The preferred host on which the resource must be allocated. Can be set to + * ContainerRequestState.ANY_HOST if there are no host preferences + */ + private final String preferredHost; + /** + * A request is identified by an unique identifier. + */ + private final String requestID; + /** + * The ID of the StreamProcessor which this request is for. + */ + private final int containerID; + + /** + * The timestamp in millis when the request was created. + */ + private final long requestTimestampMs; + + public SamzaResourceRequest(int numCores, int memoryMB, String preferredHost, int expectedContainerID) { + this.numCores = numCores; + this.memoryMB = memoryMB; + this.preferredHost = preferredHost; + this.requestID = UUID.randomUUID().toString(); + this.containerID = expectedContainerID; + this.requestTimestampMs = System.currentTimeMillis(); + log.info("Resource Request created for {} on {} at {}", new Object[] {this.containerID, this.preferredHost, this.requestTimestampMs}); + } + + public int getContainerID() { + return containerID; + } + + public long getRequestTimestampMs() { + return requestTimestampMs; + } + + public String getRequestID() { + return requestID; + } + + public int getNumCores() { + return numCores; + } + + public String getPreferredHost() { + return preferredHost; + } + + public int getMemoryMB() { + return memoryMB; + } + + @Override + public String toString() { + return "SamzaResourceRequest{" + + "numCores=" + numCores + + ", memoryMB=" + memoryMB + + ", preferredHost='" + preferredHost + '\'' + + ", requestID='" + requestID + '\'' + + ", containerID=" + containerID + + ", requestTimestampMs=" + requestTimestampMs + + '}'; + } + + /** + * Requests are ordered by the time at which they were created. + * @param o the other + */ + @Override + public int compareTo(SamzaResourceRequest o) { + if (this.requestTimestampMs < o.requestTimestampMs) + return -1; + if (this.requestTimestampMs > o.requestTimestampMs) + return 1; + return 0; + } +}
http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java new file mode 100644 index 0000000..7e16ce1 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +/** + * <p><code>SamzaResourceStatus</code> represents the current status of a + * <code>StreamProcessor</code> and the resource it is on.</p> + * + * <p>It provides details such as: + * <ul> + * <li><code>resourceID</code> of the resource.</li> + * <li><em>Exit status</em> of the StreamProcessor.</li> + * <li><em>Diagnostic</em> message for a failed/pre-empted StreamProcessor.</li> + * </ul> + * + * + * The exact semantics of various exit codes and failure modes are evolving. + * Currently the following failures are handled - termination of a process running in the resource, + * resource preemption, disk failures on host. + * + */ +public final class SamzaResourceStatus { + /** + * Indicates that the StreamProcessor on the resource successfully completed. + */ + public static final int SUCCESS = 0; + /** + * Indicates the failure of the StreamProcessor running on the resource. + */ + public static final int ABORTED = -100; + /** + * Indicates that the resource was preempted (given to another processor) by + * the cluster manager + */ + public static final int PREEMPTED = -102; + /** + * Indicates a disk failure in the host the resource is on. + * Currently these are modelled after Yarn, could evolve as we add integrations with + * many cluster managers. + */ + public static final int DISK_FAIL = -101; + + private final String resourceID; + private final String diagnostics; + private final int exitCode; + + + public SamzaResourceStatus(String resourceID, String diagnostics, int exitCode) { + this.resourceID = resourceID; + this.diagnostics = diagnostics; + this.exitCode = exitCode; + } + + public int getExitCode() { + return exitCode; + } + + public String getDiagnostics() { + return diagnostics; + } + + public String getResourceID() { + return resourceID; + } + + @Override + public String toString() { + return "SamzaResourceStatus{" + + "resourceID='" + resourceID + '\'' + + ", diagnostics='" + diagnostics + '\'' + + ", exitCode=" + exitCode + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java new file mode 100644 index 0000000..dafd7a7 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.samza.config; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Configs when Samza is used with a ClusterManager like Yarn or Mesos. Some of these configs were originally defined + * in the yarn namespace. These will be moved to the "cluster-manager" namespace. For now, both configs will be honored + * with the cluster-manager.* configs taking precedence. There will be a deprecated config warning when old configs are used. + * Later, we'll enforce the new configs. + */ +public class ClusterManagerConfig extends MapConfig { + + private static final Logger log = LoggerFactory.getLogger(ClusterManagerConfig.class); + + + private static final String CLUSTER_MANAGER_FACTORY = "samza.cluster-manager.factory"; + private static final String CLUSTER_MANAGER_FACTORY_DEFAULT = "org.apache.samza.job.yarn.YarnResourceManagerFactory"; + + /** + * Sleep interval for the allocator thread in milliseconds + */ + private static final String ALLOCATOR_SLEEP_MS = "cluster-manager.allocator.sleep.ms"; + public static final String YARN_ALLOCATOR_SLEEP_MS = "yarn.allocator.sleep.ms"; + private static final int DEFAULT_ALLOCATOR_SLEEP_MS = 3600; + + /** + * Number of milliseconds before a container request is considered to have to expired + */ + public static final String CONTAINER_REQUEST_TIMEOUT_MS = "yarn.container.request.timeout.ms"; + public static final String CLUSTER_MANAGER_REQUEST_TIMEOUT_MS = "cluster-manager.container.request.timeout.ms"; + private static final int DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS = 5000; + + /** + * Flag to indicate if host-affinity is enabled for the job or not + */ + public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled"; + public static final String CLUSTER_MANAGER_HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled"; + private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false; + + /** + * Number of CPU cores to request from the cluster manager per container + */ + public static final String CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"; + public static final String CLUSTER_MANAGER_MAX_CORES = "cluster-manager.cpu.cores"; + private static final int DEFAULT_CPU_CORES = 1; + + /** + * Memory, in megabytes, to request from the cluster manager per container + */ + public static final String CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb"; + public static final String CLUSTER_MANAGER_MEMORY_MB = "cluster-manager.container.memory.mb"; + private static final int DEFAULT_CONTAINER_MEM = 1024; + + /** + * Determines how frequently a container is allowed to fail before we give up and fail the job + */ + public static final String CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"; + public static final String CLUSTER_MANAGER_RETRY_WINDOW_MS = "cluster-manager.container.retry.window.ms"; + private static final int DEFAULT_CONTAINER_RETRY_WINDOW_MS = 300000; + + /** + * Maximum number of times Samza tries to restart a failed container + */ + public static final String CONTAINER_RETRY_COUNT = "yarn.container.retry.count"; + public static final String CLUSTER_MANAGER_CONTAINER_RETRY_COUNT = "cluster-manager.container.retry.count"; + private static final int DEFAULT_CONTAINER_RETRY_COUNT = 8; + + /** + * Determines whether a JMX server should be started on the job coordinator + * Default: true + */ + public static final String AM_JMX_ENABLED = "yarn.am.jmx.enabled"; + public static final String CLUSTER_MANAGER_JMX_ENABLED = "cluster-manager.jobcoordinator.jmx.enabled"; + + /** + * The cluster managed job coordinator sleeps for a configurable time before checking again for termination. + * The sleep interval of the cluster managed job coordinator. + */ + public static final String CLUSTER_MANAGER_SLEEP_MS = "cluster-manager.jobcoordinator.sleep.interval.ms"; + private static final int DEFAULT_CLUSTER_MANAGER_SLEEP_MS = 1000; + + public ClusterManagerConfig(Config config) { + super(config); + } + + public int getAllocatorSleepTime() { + if (containsKey(ALLOCATOR_SLEEP_MS)) { + return getInt(ALLOCATOR_SLEEP_MS); + } else if (containsKey(YARN_ALLOCATOR_SLEEP_MS)) { + log.info("Configuration {} is deprecated. Please use {}", YARN_ALLOCATOR_SLEEP_MS, ALLOCATOR_SLEEP_MS); + return getInt(YARN_ALLOCATOR_SLEEP_MS); + } else { + return DEFAULT_ALLOCATOR_SLEEP_MS; + } + } + + public int getNumCores() { + if (containsKey(CLUSTER_MANAGER_MAX_CORES)) { + return getInt(CLUSTER_MANAGER_MAX_CORES); + } else if (containsKey(CONTAINER_MAX_CPU_CORES)) { + log.info("Configuration {} is deprecated. Please use {}", CONTAINER_MAX_CPU_CORES, CLUSTER_MANAGER_MAX_CORES); + return getInt(CONTAINER_MAX_CPU_CORES); + } else { + return DEFAULT_CPU_CORES; + } + } + + public int getContainerMemoryMb() { + if (containsKey(CLUSTER_MANAGER_MEMORY_MB)) { + return getInt(CLUSTER_MANAGER_MEMORY_MB); + } else if (containsKey(CONTAINER_MAX_MEMORY_MB)) { + log.info("Configuration {} is deprecated. Please use {}", CONTAINER_MAX_MEMORY_MB, CLUSTER_MANAGER_MEMORY_MB); + return getInt(CONTAINER_MAX_MEMORY_MB); + } else { + return DEFAULT_CONTAINER_MEM; + } + } + + public boolean getHostAffinityEnabled() { + if (containsKey(CLUSTER_MANAGER_HOST_AFFINITY_ENABLED)) { + return getBoolean(CLUSTER_MANAGER_HOST_AFFINITY_ENABLED); + } else if (containsKey(HOST_AFFINITY_ENABLED)) { + log.info("Configuration {} is deprecated. Please use {}", HOST_AFFINITY_ENABLED, CLUSTER_MANAGER_HOST_AFFINITY_ENABLED); + return getBoolean(HOST_AFFINITY_ENABLED); + } else { + return false; + } + } + + public int getContainerRequestTimeout() { + if (containsKey(CLUSTER_MANAGER_REQUEST_TIMEOUT_MS)) { + return getInt(CLUSTER_MANAGER_REQUEST_TIMEOUT_MS); + } else if (containsKey(CONTAINER_REQUEST_TIMEOUT_MS)) { + log.info("Configuration {} is deprecated. Please use {}", CONTAINER_REQUEST_TIMEOUT_MS, CLUSTER_MANAGER_REQUEST_TIMEOUT_MS); + return getInt(CONTAINER_REQUEST_TIMEOUT_MS); + } else { + return DEFAULT_CONTAINER_REQUEST_TIMEOUT_MS; + } + } + + public int getContainerRetryCount() { + if (containsKey(CLUSTER_MANAGER_CONTAINER_RETRY_COUNT)) + return getInt(CLUSTER_MANAGER_CONTAINER_RETRY_COUNT); + else if (containsKey(CONTAINER_RETRY_COUNT)) { + log.info("Configuration {} is deprecated. Please use {}", CONTAINER_RETRY_COUNT, CLUSTER_MANAGER_CONTAINER_RETRY_COUNT); + return getInt(CONTAINER_RETRY_COUNT); + } else { + return DEFAULT_CONTAINER_RETRY_COUNT; + } + } + + public int getContainerRetryWindowMs() { + if (containsKey(CLUSTER_MANAGER_RETRY_WINDOW_MS)) { + return getInt(CLUSTER_MANAGER_RETRY_WINDOW_MS); + } else if (containsKey(CONTAINER_RETRY_WINDOW_MS)) { + log.info("Configuration {} is deprecated. Please use {}", CONTAINER_RETRY_WINDOW_MS, CLUSTER_MANAGER_RETRY_WINDOW_MS); + return getInt(CONTAINER_RETRY_WINDOW_MS); + } else { + return DEFAULT_CONTAINER_RETRY_WINDOW_MS; + } + } + + + public int getJobCoordinatorSleepInterval() { + return getInt(CLUSTER_MANAGER_SLEEP_MS, DEFAULT_CLUSTER_MANAGER_SLEEP_MS); + } + + public String getContainerManagerClass() { + return get(CLUSTER_MANAGER_FACTORY, CLUSTER_MANAGER_FACTORY_DEFAULT); + } + + public boolean getJmxEnabled() { + if (containsKey(CLUSTER_MANAGER_JMX_ENABLED)) { + return getBoolean(CLUSTER_MANAGER_JMX_ENABLED); + } else if (containsKey(AM_JMX_ENABLED)) { + log.info("Configuration {} is deprecated. Please use {}", AM_JMX_ENABLED, CLUSTER_MANAGER_JMX_ENABLED); + return getBoolean(AM_JMX_ENABLED); + } else { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java index a3281c2..a615d4f 100644 --- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java @@ -88,7 +88,7 @@ public class LocalityManager extends AbstractCoordinatorStreamManager { /** * Method to allow read container locality information from coordinator stream. This method is used - * in {@link org.apache.samza.coordinator.JobCoordinator}. + * in {@link org.apache.samza.coordinator.JobModelManager}. * * @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress) */ http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java index 6473dfb..11207b2 100644 --- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java +++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java @@ -66,7 +66,7 @@ public class TaskAssignmentManager extends AbstractCoordinatorStreamManager { /** * Method to allow read container task information from coordinator stream. This method is used - * in {@link org.apache.samza.coordinator.JobCoordinator}. + * in {@link org.apache.samza.coordinator.JobModelManager}. * * @return the map of taskName: containerId */ http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index 0324e90..9329edf 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -31,7 +31,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JavaStorageConfig; import org.apache.samza.config.JavaSystemConfig; import org.apache.samza.container.SamzaContainerContext; -import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; @@ -117,7 +117,7 @@ public class StorageRecovery extends CommandLine { * map */ private void getContainerModels() { - JobModel jobModel = JobCoordinator.apply(jobConfig).jobModel(); + JobModel jobModel = JobModelManager.apply(jobConfig).jobModel(); containers = jobModel.getContainers(); } http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala index 31b208f..91de18d 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala @@ -32,7 +32,7 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.{Util, CommandLine, Logging} import org.apache.samza.{Partition, SamzaException} import scala.collection.JavaConversions._ -import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.coordinator.JobModelManager import scala.collection.immutable.HashMap @@ -141,8 +141,8 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manage info("Using %s" format manager) // Find all the TaskNames that would be generated for this job config - val coordinator = JobCoordinator(config) - val taskNames = coordinator + val jobModelManager = JobModelManager(config) + val taskNames = jobModelManager .jobModel .getContainers .values http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index bd7f3f5..e9a5108 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -45,12 +45,12 @@ import scala.collection.JavaConversions._ * Helper companion object that is responsible for wiring up a JobCoordinator * given a Config object. */ -object JobCoordinator extends Logging { +object JobModelManager extends Logging { /** * a volatile value to store the current instantiated <code>JobCoordinator</code> */ - @volatile var currentJobCoordinator: JobCoordinator = null + @volatile var currentJobModelManager: JobModelManager = null val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]() /** @@ -59,7 +59,7 @@ object JobCoordinator extends Logging { * configuration. The method will use this config to read all configuration * from the coordinator stream, and instantiate a JobCoordinator. */ - def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobCoordinator = { + def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobModelManager = { val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory() val coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap) val coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap) @@ -107,7 +107,7 @@ object JobCoordinator extends Logging { jobCoordinator } - def apply(coordinatorSystemConfig: Config): JobCoordinator = apply(coordinatorSystemConfig, new MetricsRegistryMap()) + def apply(coordinatorSystemConfig: Config): JobModelManager = apply(coordinatorSystemConfig, new MetricsRegistryMap()) /** * Build a JobCoordinator using a Samza job's configuration. @@ -122,8 +122,8 @@ object JobCoordinator extends Logging { val server = new HttpServer server.addServlet("/*", new JobServlet(jobModelRef)) - currentJobCoordinator = new JobCoordinator(jobModel, server, streamPartitionCountMonitor) - currentJobCoordinator + currentJobModelManager = new JobModelManager(jobModel, server, streamPartitionCountMonitor) + currentJobModelManager } /** @@ -299,7 +299,7 @@ object JobCoordinator extends Logging { * coordinator's responsibility is simply to propagate the job model, and HTTP * server right now.</p> */ -class JobCoordinator( +class JobModelManager( /** * The data model that describes the Samza job's containers and tasks. */ http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala index 95d01dd..f5c8a46 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala @@ -23,7 +23,7 @@ import java.io.{InputStream, OutputStream} import java.util.concurrent.CountDownLatch import org.apache.samza.SamzaException -import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.coordinator.JobModelManager import org.apache.samza.job.ApplicationStatus.{New, Running, UnsuccessfulFinish} import org.apache.samza.job.util.ProcessKiller import org.apache.samza.job.{ApplicationStatus, CommandBuilder, StreamJob} @@ -31,7 +31,7 @@ import org.apache.samza.util.Logging import scala.collection.JavaConversions._ -class ProcessJob(commandBuilder: CommandBuilder, jobCoordinator: JobCoordinator) extends StreamJob with Logging { +class ProcessJob(commandBuilder: CommandBuilder, jobCoordinator: JobModelManager) extends StreamJob with Logging { var jobStatus: Option[ApplicationStatus] = None var process: Process = null http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala index 81ef59a..475df52 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala @@ -25,7 +25,7 @@ import java.io.File import org.apache.samza.SamzaException import org.apache.samza.config.{JobConfig, Config} import org.apache.samza.config.TaskConfig._ -import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.coordinator.JobModelManager import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory} import org.apache.samza.util.{Logging, Util} @@ -40,7 +40,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging { throw new SamzaException("Container count larger than 1 is not supported for ProcessJobFactory") } - val coordinator = JobCoordinator(config) + val coordinator = JobModelManager(config) val containerModel = coordinator.jobModel.getContainers.get(0) val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is configured http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 5acfe87..56881d4 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -29,7 +29,7 @@ import org.apache.samza.config.TaskConfig._ import org.apache.samza.container.SamzaContainer import org.apache.samza.job.{ StreamJob, StreamJobFactory } import org.apache.samza.config.JobConfig._ -import org.apache.samza.coordinator.JobCoordinator +import org.apache.samza.coordinator.JobModelManager /** * Creates a new Thread job with the given config @@ -37,7 +37,7 @@ import org.apache.samza.coordinator.JobCoordinator class ThreadJobFactory extends StreamJobFactory with Logging { def getJob(config: Config): StreamJob = { info("Creating a ThreadJob, which is only meant for debugging.") - val coordinator = JobCoordinator(config) + val coordinator = JobModelManager(config) val containerModel = coordinator.jobModel.getContainers.get(0) // Give developers a nice friendly warning if they've specified task.opts and are using a threaded job. http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala new file mode 100644 index 0000000..86c2440 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics + +import org.apache.samza.SamzaException +import org.apache.samza.clustermanager.SamzaApplicationState +import org.apache.samza.config.Config +import org.apache.samza.config.MetricsConfig.Config2Metrics +import org.apache.samza.util.{Logging, Util} + +object ContainerProcessManagerMetrics { + val sourceName = "ApplicationMaster" +} + +/** + * Responsible for wiring up Samza's metrics. Given that Samza has a metric + * registry, we might as well use it. This class takes Samza's application + * master state, and converts it to metrics. + */ +class ContainerProcessManagerMetrics( + val config: Config, + val state: SamzaApplicationState, + val registry: ReadableMetricsRegistry) extends MetricsHelper with Logging { + + val jvm = new JvmMetrics(registry) + val reporters = config.getMetricReporterNames.map(reporterName => { + val metricsFactoryClassName = config + .getMetricsFactoryClass(reporterName) + .getOrElse(throw new SamzaException("Metrics reporter %s missing .class config" format reporterName)) + + val reporter = + Util + .getObj[MetricsReporterFactory](metricsFactoryClassName) + .getMetricsReporter(reporterName, ContainerProcessManagerMetrics.sourceName, config) + + reporter.register(ContainerProcessManagerMetrics.sourceName, registry) + (reporterName, reporter) + }).toMap + + def start() { + val mRunningContainers = newGauge("running-containers", () => state.runningContainers.size) + val mNeededContainers = newGauge("needed-containers", () => state.neededResources.get()) + val mCompletedContainers = newGauge("completed-containers", () => state.completedContainers.get()) + val mFailedContainers = newGauge("failed-containers", () => state.failedContainers.get()) + val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get()) + val mContainers = newGauge("container-count", () => state.containerCount) + + val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0) + val mLocalityMatchedRequests = newGauge( + "locality-matched", + () => { + if (state.containerRequests.get() != 0) { + state.matchedResourceRequests.get() / state.containerRequests.get() + } else { + 0L + } + }) + + jvm.start + reporters.values.foreach(_.start) + } + + def stop() { + reporters.values.foreach(_.stop) + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java new file mode 100644 index 0000000..0d13fb1 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +import org.apache.samza.job.CommandBuilder; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +public class MockClusterResourceManager extends ClusterResourceManager { + Set<SamzaResource> releasedResources = new HashSet<>(); + List<SamzaResource> resourceRequests = new ArrayList<>(); + List<SamzaResourceRequest> cancelledRequests = new ArrayList<>(); + List<SamzaResource> launchedResources = new ArrayList<>(); + List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>(); + Throwable nextException = null; + + public MockClusterResourceManager(ClusterResourceManager.Callback callback) { + super(callback); + } + + @Override + public void start() { + + } + + @Override + public void requestResources(SamzaResourceRequest resourceRequest) { + SamzaResource resource = new SamzaResource(resourceRequest.getNumCores(), resourceRequest.getMemoryMB(), resourceRequest.getPreferredHost(), UUID.randomUUID().toString()); + List<SamzaResource> resources = Collections.singletonList(resource); + resourceRequests.addAll(resources); + } + + @Override + public void cancelResourceRequest(SamzaResourceRequest request) { + cancelledRequests.add(request); + } + + @Override + public void releaseResources(SamzaResource resource) { + releasedResources.add(resource); + } + + @Override + public void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException { + if (nextException != null) { + throw new SamzaContainerLaunchException(nextException); + } + launchedResources.add(resource); + for (MockContainerListener listener : mockContainerListeners) { + listener.postRunContainer(launchedResources.size()); + } + + } + + @Override + public void stop(SamzaApplicationState.SamzaAppStatus status) { + + } + + public void registerContainerListener(MockContainerListener listener) { + mockContainerListeners.add(listener); + } + + public void clearContainerListeners() { + mockContainerListeners.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java new file mode 100644 index 0000000..5079625 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManagerCallback.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +import java.util.ArrayList; +import java.util.List; + +public class MockClusterResourceManagerCallback implements ClusterResourceManager.Callback { + List<SamzaResource> resources = new ArrayList<>(); + List<SamzaResourceStatus> resourceStatuses = new ArrayList<>(); + Throwable error; + + @Override + public void onResourcesAvailable(List<SamzaResource> resourceList) { + resources.addAll(resourceList); + } + + @Override + public void onResourcesCompleted(List<SamzaResourceStatus> resourceStatusList) { + resourceStatuses.addAll(resourceStatusList); + } + + @Override + public void onError(Throwable e) { + error = e; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java new file mode 100644 index 0000000..6189fe7 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerAllocator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import org.apache.samza.config.Config; + +import java.lang.reflect.Field; + +import java.util.Map; + +public class MockContainerAllocator extends ContainerAllocator { + public int requestedContainers = 0; + + public MockContainerAllocator(ClusterResourceManager manager, + Config config, + SamzaApplicationState state) { + super(manager, config, state); + } + + @Override + public void requestResources(Map<Integer, String> containerToHostMappings) { + requestedContainers += containerToHostMappings.size(); + super.requestResources(containerToHostMappings); + } + + public ResourceRequestState getContainerRequestState() throws Exception { + Field field = AbstractContainerAllocator.class.getDeclaredField("resourceRequestState"); + field.setAccessible(true); + + return (ResourceRequestState) field.get(this); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java new file mode 100644 index 0000000..db70c38 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerListener.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +public class MockContainerListener { + private final CountDownLatch conditionLatch; + + + private final AsyncCountableCondition containersAdded; + private final AsyncCountableCondition containersReleased; + private final AsyncCountableCondition containersAssigned; + private final AsyncCountableCondition containersRunning; + + private final AsyncCountableCondition[] allConditions; + + public MockContainerListener(int numExpectedContainersAdded, + int numExpectedContainersReleased, + int numExpectedContainersAssigned, + int numExpectedContainersRunning, + Runnable addContainerAssertions, + Runnable releaseContainerAssertions, + Runnable assignContainerAssertions, + Runnable runContainerAssertions) { + containersAdded = new AsyncCountableCondition("containers added", numExpectedContainersAdded, addContainerAssertions); + containersReleased = new AsyncCountableCondition("containers released", numExpectedContainersReleased, releaseContainerAssertions); + containersAssigned = new AsyncCountableCondition("containers assigned", numExpectedContainersAssigned, assignContainerAssertions); + containersRunning = new AsyncCountableCondition("containers running", numExpectedContainersRunning, runContainerAssertions); + + allConditions = new AsyncCountableCondition[] {containersAdded, containersReleased, containersAssigned, containersRunning}; + + int unsatisfiedConditions = 0; + for (AsyncCountableCondition condition : allConditions) { + if (!condition.isSatisfied()) { + unsatisfiedConditions++; + } + } + + conditionLatch = new CountDownLatch(unsatisfiedConditions); + } + + public void postAddContainer(int totalAddedContainers) { + if (containersAdded.update(totalAddedContainers)) { + conditionLatch.countDown(); + } + } + + public void postReleaseContainers(int totalReleasedContainers) { + if (containersReleased.update(totalReleasedContainers)) { + conditionLatch.countDown(); + } + } + + public void postUpdateRequestStateAfterAssignment(int totalAssignedContainers) { + if (containersAssigned.update(totalAssignedContainers)) { + conditionLatch.countDown(); + } + } + + public void postRunContainer(int totalRunningContainers) { + if (containersRunning.update(totalRunningContainers)) { + conditionLatch.countDown(); + } + } + + /** + * This method should be called in the main thread. It waits for all the conditions to occur in the other + * threads and then verifies that they were in fact satisfied. + */ + public void verify() throws InterruptedException { + conditionLatch.await(5, TimeUnit.SECONDS); + + for (AsyncCountableCondition condition : allConditions) { + condition.verify(); + } + } + + private static class AsyncCountableCondition { + private boolean satisfied = false; + private final int expectedCount; + private final Runnable postConditionAssertions; + private final String name; + private AssertionError assertionError = null; + + private AsyncCountableCondition(String name, int expectedCount, Runnable postConditionAssertions) { + this.name = name; + this.expectedCount = expectedCount; + if (expectedCount == 0) satisfied = true; + this.postConditionAssertions = postConditionAssertions; + } + + public boolean update(int latestCount) { + if (!satisfied && latestCount == expectedCount) { + if (postConditionAssertions != null) { + try { + postConditionAssertions.run(); + } catch (Throwable t) { + assertionError = new AssertionError(String.format("Assertion for '%s' failed", name), t); + } + } + + satisfied = true; + return true; + } + return false; + } + + public boolean isSatisfied() { + return satisfied; + } + + public void verify() { + assertTrue(String.format("Condition '%s' was not satisfied", name), isSatisfied()); + + if (assertionError != null) { + throw assertionError; + } + } + + @Override + public String toString() { + return name; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java new file mode 100644 index 0000000..3aa58b2 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockContainerRequestState.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + + +public class MockContainerRequestState extends ResourceRequestState { + private final List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>(); + private int numAddedContainers = 0; + private int numReleasedContainers = 0; + private int numAssignedContainers = 0; + public Queue<SamzaResourceRequest> assignedRequests = new LinkedList<>(); + + public MockContainerRequestState(ClusterResourceManager manager, + boolean hostAffinityEnabled) { + super(hostAffinityEnabled, manager); + } + + @Override + public synchronized void updateStateAfterAssignment(SamzaResourceRequest request, String assignedHost, SamzaResource resource) { + super.updateStateAfterAssignment(request, assignedHost, resource); + + numAssignedContainers++; + assignedRequests.add(request); + + for (MockContainerListener listener : mockContainerListeners) { + listener.postUpdateRequestStateAfterAssignment(numAssignedContainers); + } + } + + @Override + public synchronized void addResource(SamzaResource container) { + super.addResource(container); + + numAddedContainers++; + for (MockContainerListener listener : mockContainerListeners) { + listener.postAddContainer(numAddedContainers); + } + } + + @Override + public synchronized int releaseExtraResources() { + numReleasedContainers += super.releaseExtraResources(); + + for (MockContainerListener listener : mockContainerListeners) { + listener.postReleaseContainers(numReleasedContainers); + } + + return numAddedContainers; + } + + @Override + public void releaseUnstartableContainer(SamzaResource container) { + super.releaseUnstartableContainer(container); + + numReleasedContainers += 1; + for (MockContainerListener listener : mockContainerListeners) { + listener.postReleaseContainers(numReleasedContainers); + } + } + + + public void registerContainerListener(MockContainerListener listener) { + mockContainerListeners.add(listener); + } + + public void clearContainerListeners() { + mockContainerListeners.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java new file mode 100644 index 0000000..4f44ced --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockHttpServer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +import org.apache.samza.coordinator.server.HttpServer; +import org.eclipse.jetty.servlet.ServletHolder; + +import java.net.MalformedURLException; +import java.net.URL; + +public class MockHttpServer extends HttpServer { + + public MockHttpServer(String rootPath, int port, String resourceBasePath, ServletHolder defaultHolder) { + super(rootPath, port, resourceBasePath, defaultHolder); + start(); + } + + @Override + public void start() { + super.running_$eq(true); + } + + @Override + public void stop() { + super.running_$eq(false); + } + + @Override + public URL getUrl() { + if (running()) { + try { + return new URL("http://localhost:12345/"); + } catch (MalformedURLException mue) { + mue.printStackTrace(); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java new file mode 100644 index 0000000..f147570 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocator.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.server.HttpServer; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestContainerAllocator { + private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + private final MockClusterResourceManager manager = new MockClusterResourceManager(callback); + private final Config config = getConfig(); + private final JobModelManager reader = getJobModelReader(1); + private final SamzaApplicationState state = new SamzaApplicationState(reader); + private ContainerAllocator containerAllocator; + private MockContainerRequestState requestState; + private Thread allocatorThread; + + @Before + public void setup() throws Exception { + containerAllocator = new ContainerAllocator(manager, config, state); + requestState = new MockContainerRequestState(manager, false); + Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("resourceRequestState"); + requestStateField.setAccessible(true); + requestStateField.set(containerAllocator, requestState); + allocatorThread = new Thread(containerAllocator); + } + + + + @After + public void teardown() throws Exception { + reader.stop(); + containerAllocator.stop(); + } + + + private static Config getConfig() { + Config config = new MapConfig(new HashMap<String, String>() { + { + put("yarn.container.count", "1"); + put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); + put("yarn.container.memory.mb", "512"); + put("yarn.package.path", "/foo"); + put("task.inputs", "test-system.test-stream"); + put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); + put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); + put("yarn.container.retry.count", "1"); + put("yarn.container.retry.window.ms", "1999999999"); + put("yarn.allocator.sleep.ms", "10"); + } + }); + + Map<String, String> map = new HashMap<>(); + map.putAll(config); + return new MapConfig(map); + } + + private static JobModelManager getJobModelReader(int containerCount) { + //Ideally, the JobModelReader should be constructed independent of HttpServer. + //That way it becomes easier to mock objects. Save it for later. + + HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); + Map<Integer, ContainerModel> containers = new java.util.HashMap<>(); + for (int i = 0; i < containerCount; i++) { + ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>()); + containers.put(i, container); + } + JobModel jobModel = new JobModel(getConfig(), containers); + return new JobModelManager(jobModel, server, null); + } + + + /** + * Adds all containers returned to ANY_HOST only + */ + @Test + public void testAddContainer() throws Exception { + assertNull(requestState.getResourcesOnAHost("abc")); + assertNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST)); + + containerAllocator.addResource(new SamzaResource(1, 1000, "abc", "id1")); + containerAllocator.addResource(new SamzaResource(1, 1000, "xyz", "id1")); + + + assertNull(requestState.getResourcesOnAHost("abc")); + assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST)); + assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size() == 2); + } + + /** + * Test requestContainers + */ + @Test + public void testRequestContainers() throws Exception { + Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() { + { + put(0, "abc"); + put(1, "def"); + put(2, null); + put(3, "abc"); + } + }; + + allocatorThread.start(); + + containerAllocator.requestResources(containersToHostMapping); + + assertEquals(4, manager.resourceRequests.size()); + + assertNotNull(requestState); + + assertEquals(requestState.numPendingRequests(), 4); + + // If host-affinty is not enabled, it doesn't update the requestMap + assertNotNull(requestState.getRequestsToCountMap()); + assertEquals(requestState.getRequestsToCountMap().keySet().size(), 0); + } + + /** + * Test request containers with no containerToHostMapping makes the right number of requests + */ + @Test + public void testRequestContainersWithNoMapping() throws Exception { + int containerCount = 4; + Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>(); + for (int i = 0; i < containerCount; i++) { + containersToHostMapping.put(i, null); + } + allocatorThread.start(); + + containerAllocator.requestResources(containersToHostMapping); + + assertNotNull(requestState); + + assertTrue(requestState.numPendingRequests() == 4); + + // If host-affinty is not enabled, it doesn't update the requestMap + assertNotNull(requestState.getRequestsToCountMap()); + assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0); + } + + /** + * Extra allocated containers that are returned by the RM and unused by the AM should be released. + * Containers are considered "extra" only when there are no more pending requests to fulfill + * @throws Exception + */ + @Test + public void testAllocatorReleasesExtraContainers() throws Exception { + final SamzaResource resource = new SamzaResource(1, 1000, "abc", "id1"); + final SamzaResource resource1 = new SamzaResource(1, 1000, "abc", "id2"); + final SamzaResource resource2 = new SamzaResource(1, 1000, "def", "id3"); + + + // Set up our final asserts before starting the allocator thread + MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, null, new Runnable() { + @Override + public void run() { + + assertTrue(manager.releasedResources.contains(resource1)); + assertTrue(manager.releasedResources.contains(resource2)); + + // Test that state is cleaned up + assertEquals(0, requestState.numPendingRequests()); + assertEquals(0, requestState.getRequestsToCountMap().size()); + assertNull(requestState.getResourcesOnAHost("abc")); + assertNull(requestState.getResourcesOnAHost("def")); + } + }, null, null); + requestState.registerContainerListener(listener); + + allocatorThread.start(); + + containerAllocator.requestResource(0, "abc"); + + containerAllocator.addResource(resource); + containerAllocator.addResource(resource1); + containerAllocator.addResource(resource2); + + listener.verify(); + } + + + /** + * If the container fails to start e.g because it fails to connect to a NM on a host that + * is down, the allocator should request a new container on a different host. + */ + @Test + public void testRerequestOnAnyHostIfContainerStartFails() throws Exception { + final SamzaResource container = new SamzaResource(1, 1024, "2", "id0"); + final SamzaResource container1 = new SamzaResource(1, 1024, "2", "id0"); + manager.nextException = new IOException("Cant connect to RM"); + + // Set up our final asserts before starting the allocator thread + MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, null, new Runnable() { + @Override + public void run() { + // The failed container should be released. The successful one should not. + assertNotNull(manager.releasedResources); + assertEquals(1, manager.releasedResources.size()); + assertTrue(manager.releasedResources.contains(container)); + } + }, + new Runnable() { + @Override + public void run() { + // Test that the first request assignment had a preferred host and the retry didn't + assertEquals(2, requestState.assignedRequests.size()); + + SamzaResourceRequest request = requestState.assignedRequests.remove(); + assertEquals(0, request.getContainerID()); + assertEquals("2", request.getPreferredHost()); + + request = requestState.assignedRequests.remove(); + assertEquals(0, request.getContainerID()); + assertEquals("ANY_HOST", request.getPreferredHost()); + + // This routine should be called after the retry is assigned, but before it's started. + // So there should still be 1 container needed. + assertEquals(1, state.neededResources.get()); + } + }, null + ); + state.neededResources.set(1); + requestState.registerContainerListener(listener); + + containerAllocator.requestResource(0, "2"); + containerAllocator.addResource(container); + containerAllocator.addResource(container1); + allocatorThread.start(); + + listener.verify(); + + } + + +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java new file mode 100644 index 0000000..4fd1018 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.container.LocalityManager; +import org.apache.samza.container.TaskName; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.server.HttpServer; +import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestContainerProcessManager { + private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + private final MockClusterResourceManager manager = new MockClusterResourceManager(callback); + + private static volatile boolean isRunning = false; + + private Map<String, String> configVals = new HashMap<String, String>() { + { + put("yarn.container.count", "1"); + put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); + put("yarn.container.memory.mb", "512"); + put("yarn.package.path", "/foo"); + put("task.inputs", "test-system.test-stream"); + put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); + put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); + put("yarn.container.retry.count", "1"); + put("yarn.container.retry.window.ms", "1999999999"); + put("yarn.allocator.sleep.ms", "1"); + put("yarn.container.request.timeout.ms", "2"); + } + }; + private Config config = new MapConfig(configVals); + + private Config getConfig() { + Map<String, String> map = new HashMap<>(); + map.putAll(config); + return new MapConfig(map); + } + + private Config getConfigWithHostAffinity() { + Map<String, String> map = new HashMap<>(); + map.putAll(config); + map.put("yarn.samza.host-affinity.enabled", "true"); + return new MapConfig(map); + } + + private HttpServer server = null; + + private SamzaApplicationState state = null; + + + private JobModelManager getCoordinator(int containerCount) { + Map<Integer, ContainerModel> containers = new java.util.HashMap<>(); + for (int i = 0; i < containerCount; i++) { + ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>()); + containers.put(i, container); + } + Map<Integer, Map<String, String>> localityMap = new HashMap<>(); + localityMap.put(0, new HashMap<String, String>() { { + put(SetContainerHostMapping.HOST_KEY, "abc"); + } + }); + LocalityManager mockLocalityManager = mock(LocalityManager.class); + when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap); + + JobModel jobModel = new JobModel(getConfig(), containers, mockLocalityManager); + JobModelManager.jobModelRef().getAndSet(jobModel); + + JobModelManager reader = new JobModelManager(jobModel, this.server, null); + + return reader; + } + + @Before + public void setup() throws Exception { + server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); + state = new SamzaApplicationState(getCoordinator(1)); + } + + private Field getPrivateFieldFromTaskManager(String fieldName, ContainerProcessManager object) throws Exception { + Field field = object.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return field; + } + + + @Test + public void testContainerProcessManager() throws Exception { + Map<String, String> conf = new HashMap<>(); + conf.putAll(getConfig()); + conf.put("yarn.container.memory.mb", "500"); + conf.put("yarn.container.cpu.cores", "5"); + + ContainerProcessManager taskManager = new ContainerProcessManager( + new MapConfig(conf), + state, + new MetricsRegistryMap(), + manager + ); + + AbstractContainerAllocator allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager); + assertEquals(ContainerAllocator.class, allocator.getClass()); + // Asserts that samza exposed container configs is honored by allocator thread + assertEquals(500, allocator.containerMemoryMb); + assertEquals(5, allocator.containerNumCpuCores); + + conf.clear(); + conf.putAll(getConfigWithHostAffinity()); + conf.put("yarn.container.memory.mb", "500"); + conf.put("yarn.container.cpu.cores", "5"); + + taskManager = new ContainerProcessManager( + new MapConfig(conf), + state, + new MetricsRegistryMap(), + manager + ); + + allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager); + assertEquals(HostAwareContainerAllocator.class, allocator.getClass()); + // Asserts that samza exposed container configs is honored by allocator thread + assertEquals(500, allocator.containerMemoryMb); + assertEquals(5, allocator.containerNumCpuCores); + } + + @Test + public void testOnInit() throws Exception { + Config conf = getConfig(); + ContainerProcessManager taskManager = new ContainerProcessManager( + new MapConfig(conf), + state, + new MetricsRegistryMap(), + manager + ); + + MockContainerAllocator allocator = new MockContainerAllocator( + manager, + conf, + state); + + getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); + + getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, new Thread() { + public void run() { + isRunning = true; + } + }); + + taskManager.start(); + Thread.sleep(1000); + + // Verify Allocator thread has started running + assertTrue(isRunning); + + // Verify the remaining state + assertEquals(1, state.neededResources.get()); + assertEquals(1, allocator.requestedContainers); + + taskManager.stop(); + } + + @Test + public void testOnShutdown() throws Exception { + Config conf = getConfig(); + ContainerProcessManager taskManager = new ContainerProcessManager( + new MapConfig(conf), + state, + new MetricsRegistryMap(), + manager + ); + taskManager.start(); + + Thread.sleep(100); + + Thread allocatorThread = (Thread) getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager); + assertTrue(allocatorThread.isAlive()); + + taskManager.stop(); + + Thread.sleep(100); + assertFalse(allocatorThread.isAlive()); + + } + + /** + * Test Task Manager should stop when all containers finish + */ + @Test + public void testTaskManagerShouldStopWhenContainersFinish() { + Config conf = getConfig(); + ContainerProcessManager taskManager = new ContainerProcessManager( + new MapConfig(conf), + state, + new MetricsRegistryMap(), + manager + ); + + taskManager.start(); + + assertFalse(taskManager.shouldShutdown()); + + taskManager.onResourceCompleted(new SamzaResourceStatus("123", "diagnostics", SamzaResourceStatus.SUCCESS)); + + + assertTrue(taskManager.shouldShutdown()); + } + + /** + * Test Task Manager should request a new container when a task fails with unknown exit code + * When host-affinity is not enabled, it will always request for ANY_HOST + */ + @Test + public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception { + Config conf = getConfig(); + + ContainerProcessManager taskManager = new ContainerProcessManager( + new MapConfig(conf), + state, + new MetricsRegistryMap(), + manager + ); + + MockContainerAllocator allocator = new MockContainerAllocator( + manager, + conf, + state); + + getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); + + Thread thread = new Thread(allocator); + getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread); + + // start triggers a request + taskManager.start(); + + assertFalse(taskManager.shouldShutdown()); + assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); + + + SamzaResource container = new SamzaResource(1, 1024, "abc", "id0"); + taskManager.onResourceAllocated(container); + + // Allow container to run and update state + Thread.sleep(300); + + // Create first container failure + taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "diagnostics", 1)); + + // The above failure should trigger a container request + assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); + assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); + + + assertFalse(taskManager.shouldShutdown()); + assertFalse(state.jobHealthy.get()); + assertEquals(2, manager.resourceRequests.size()); + assertEquals(0, manager.releasedResources.size()); + + taskManager.onResourceAllocated(container); + + // Allow container to run and update state + Thread.sleep(1000); + + assertTrue(state.jobHealthy.get()); + + // Create a second failure + taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "diagnostics", 1)); + + + // The above failure should trigger a job shutdown because our retry count is set to 1 + assertEquals(0, allocator.getContainerRequestState().numPendingRequests()); + assertEquals(2, manager.resourceRequests.size()); + assertEquals(0, manager.releasedResources.size()); + assertFalse(state.jobHealthy.get()); + assertTrue(taskManager.shouldShutdown()); + assertEquals(SamzaApplicationState.SamzaAppStatus.FAILED, state.status); + + taskManager.stop(); + } + + /** + * Test AM requests a new container when a task fails + * Error codes with same behavior - Disk failure, preemption and aborted + */ + @Test + public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception { + Config conf = getConfig(); + + Map<String, String> config = new HashMap<>(); + config.putAll(getConfig()); + config.remove("yarn.container.retry.count"); + + ContainerProcessManager taskManager = new ContainerProcessManager( + new MapConfig(conf), + state, + new MetricsRegistryMap(), + manager + ); + + MockContainerAllocator allocator = new MockContainerAllocator( + manager, + conf, + state); + getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); + + Thread thread = new Thread(allocator); + getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, thread); + + // Start the task manager + taskManager.start(); + assertFalse(taskManager.shouldShutdown()); + assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); + + SamzaResource container = new SamzaResource(1, 1000, "abc", "id1"); + taskManager.onResourceAllocated(container); + + // Allow container to run and update state + Thread.sleep(300); + + // Create container failure - with ContainerExitStatus.DISKS_FAILED + taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Disk failure", SamzaResourceStatus.DISK_FAIL)); + + // The above failure should trigger a container request + assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); + assertFalse(taskManager.shouldShutdown()); + assertFalse(state.jobHealthy.get()); + assertEquals(2, manager.resourceRequests.size()); + assertEquals(0, manager.releasedResources.size()); + assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); + + // Create container failure - with ContainerExitStatus.PREEMPTED + taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Preemption", SamzaResourceStatus.PREEMPTED)); + + // The above failure should trigger a container request + assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); + assertFalse(taskManager.shouldShutdown()); + assertFalse(state.jobHealthy.get()); + assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); + + // Create container failure - with ContainerExitStatus.ABORTED + taskManager.onResourceCompleted(new SamzaResourceStatus(container.getResourceID(), "Aborted", SamzaResourceStatus.ABORTED)); + + // The above failure should trigger a container request + assertEquals(1, allocator.getContainerRequestState().numPendingRequests()); + assertEquals(2, manager.resourceRequests.size()); + assertEquals(0, manager.releasedResources.size()); + assertFalse(taskManager.shouldShutdown()); + assertFalse(state.jobHealthy.get()); + assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); + + taskManager.stop(); + } + + @Test + public void testAppMasterWithFwk() { + ContainerProcessManager taskManager = new ContainerProcessManager( + new MapConfig(config), + state, + new MetricsRegistryMap(), + manager + ); + taskManager.start(); + SamzaResource container2 = new SamzaResource(1, 1024, "", "id0"); + assertFalse(taskManager.shouldShutdown()); + taskManager.onResourceAllocated(container2); + + configVals.put(JobConfig.SAMZA_FWK_PATH(), "/export/content/whatever"); + Config config1 = new MapConfig(configVals); + + ContainerProcessManager taskManager1 = new ContainerProcessManager( + new MapConfig(config), + state, + new MetricsRegistryMap(), + manager + ); + taskManager1.start(); + taskManager1.onResourceAllocated(container2); + } + + + + @After + public void teardown() { + server.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java new file mode 100644 index 0000000..7a514e8 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestContainerRequestState { + + private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); + private final MockClusterResourceManager manager = new MockClusterResourceManager(callback); + + private static final String ANY_HOST = ResourceRequestState.ANY_HOST; + + /** + * Test state after a request is submitted + */ + @Test + public void testUpdateRequestState() { + // Host-affinity is enabled + ResourceRequestState state = new ResourceRequestState(true, manager); + SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", 0); + state.addResourceRequest(request); + + assertNotNull(manager.resourceRequests); + assertEquals(1, manager.resourceRequests.size()); + + assertNotNull(state.numPendingRequests() == 1); + + assertNotNull(state.getRequestsToCountMap()); + assertNotNull(state.getRequestsToCountMap().get("abc")); + assertEquals(1, state.getRequestsToCountMap().get("abc").get()); + + assertNotNull(state.getResourcesOnAHost("abc")); + assertEquals(0, state.getResourcesOnAHost("abc").size()); + + // Host-affinity is not enabled + ResourceRequestState state1 = new ResourceRequestState(false, manager); + SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, null, 1); + state1.addResourceRequest(request1); + + assertNotNull(manager.resourceRequests); + assertEquals(2, manager.resourceRequests.size()); + + + assertTrue(state1.numPendingRequests() == 1); + + assertNotNull(state1.getRequestsToCountMap()); + assertNull(state1.getRequestsToCountMap().get(ANY_HOST)); + + } + + + /** + * Test addContainer() updates the state correctly + */ + @Test + public void testAddContainer() { + // Add container to ANY_LIST when host-affinity is not enabled + ResourceRequestState state = new ResourceRequestState(false, manager); + SamzaResource resource = new SamzaResource(1, 1024, "abc", "id1"); + + state.addResource(resource); + + assertNotNull(state.getRequestsToCountMap()); + assertNotNull(state.getResourcesOnAHost(ANY_HOST)); + + assertEquals(1, state.getResourcesOnAHost(ANY_HOST).size()); + assertEquals(resource, state.getResourcesOnAHost(ANY_HOST).get(0)); + + // Container Allocated when there is no request in queue + ResourceRequestState state1 = new ResourceRequestState(true, manager); + SamzaResource container1 = new SamzaResource(1, 1024, "zzz", "id2"); + state1.addResource(container1); + + assertEquals(0, state1.numPendingRequests()); + + assertNull(state1.getResourcesOnAHost("zzz")); + assertNotNull(state1.getResourcesOnAHost(ANY_HOST)); + assertEquals(1, state1.getResourcesOnAHost(ANY_HOST).size()); + assertEquals(container1, state1.getResourcesOnAHost(ANY_HOST).get(0)); + + // Container Allocated on a Requested Host + state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "abc", 0)); + + assertEquals(1, state1.numPendingRequests()); + + assertNotNull(state1.getRequestsToCountMap()); + assertNotNull(state1.getRequestsToCountMap().get("abc")); + assertEquals(1, state1.getRequestsToCountMap().get("abc").get()); + + state1.addResource(resource); + + assertNotNull(state1.getResourcesOnAHost("abc")); + assertEquals(1, state1.getResourcesOnAHost("abc").size()); + assertEquals(resource, state1.getResourcesOnAHost("abc").get(0)); + + // Container Allocated on host that was not requested + SamzaResource container2 = new SamzaResource(1, 1024, "xyz", "id2"); + + state1.addResource(container2); + + assertNull(state1.getResourcesOnAHost("xyz")); + assertNotNull(state1.getResourcesOnAHost(ANY_HOST)); + assertEquals(2, state1.getResourcesOnAHost(ANY_HOST).size()); + assertEquals(container2, state1.getResourcesOnAHost(ANY_HOST).get(1)); + + // Extra containers were allocated on a host that was requested + SamzaResource container3 = new SamzaResource(1, 1024, "abc", "id3"); + state1.addResource(container3); + + assertEquals(3, state1.getResourcesOnAHost(ANY_HOST).size()); + assertEquals(container3, state1.getResourcesOnAHost(ANY_HOST).get(2)); + } + + /** + * Test request state after container is assigned to a host + * * Assigned on requested host + * * Assigned on any host + */ + @Test + public void testContainerAssignment() throws Exception { + // Host-affinity enabled + ResourceRequestState state = new ResourceRequestState(true, manager); + SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", 0); + + SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, "def", 0); + + state.addResourceRequest(request); + state.addResourceRequest(request1); + + SamzaResource container = new SamzaResource(1, 1024, "abc", "id0"); + + SamzaResource container1 = new SamzaResource(1, 1024, "zzz", "id1"); + state.addResource(container); + state.addResource(container1); + + assertEquals(2, state.numPendingRequests()); + assertEquals(2, state.getRequestsToCountMap().size()); + + assertNotNull(state.getResourcesOnAHost("abc")); + assertEquals(1, state.getResourcesOnAHost("abc").size()); + assertEquals(container, state.getResourcesOnAHost("abc").get(0)); + + assertNotNull(state.getResourcesOnAHost("def")); + assertEquals(0, state.getResourcesOnAHost("def").size()); + + assertNotNull(state.getResourcesOnAHost(ANY_HOST)); + assertEquals(1, state.getResourcesOnAHost(ANY_HOST).size()); + assertEquals(container1, state.getResourcesOnAHost(ANY_HOST).get(0)); + + // Container assigned on the requested host + state.updateStateAfterAssignment(request, "abc", container); + + assertEquals(request1, state.peekPendingRequest()); + + assertNotNull(state.getRequestsToCountMap().get("abc")); + assertEquals(0, state.getRequestsToCountMap().get("abc").get()); + + assertNotNull(state.getResourcesOnAHost("abc")); + assertEquals(0, state.getResourcesOnAHost("abc").size()); + + // Container assigned on any host + state.updateStateAfterAssignment(request1, ANY_HOST, container1); + + assertEquals(0, state.numPendingRequests()); + + assertNotNull(state.getRequestsToCountMap().get("def")); + assertEquals(0, state.getRequestsToCountMap().get("def").get()); + + assertNotNull(state.getResourcesOnAHost(ANY_HOST)); + assertEquals(0, state.getResourcesOnAHost(ANY_HOST).size()); + + } + + +}
