SAMZA-680: Inverting JobCoordinator and SamzaAppMaster logic
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/947472a0 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/947472a0 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/947472a0 Branch: refs/heads/master Commit: 947472a0b471716d88b0381c37ac372bd012fcf6 Parents: 9f7abf5 Author: Jagadish Venkatramen <[email protected]> Authored: Thu Jun 2 14:50:55 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Jun 2 14:50:55 2016 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 17 + .../AbstractContainerAllocator.java | 269 ++++++++++++ .../ClusterBasedJobCoordinator.java | 224 ++++++++++ .../clustermanager/ClusterResourceManager.java | 158 +++++++ .../clustermanager/ContainerAllocator.java | 55 +++ .../clustermanager/ContainerProcessManager.java | 421 ++++++++++++++++++ .../HostAwareContainerAllocator.java | 107 +++++ .../samza/clustermanager/ResourceFailure.java | 49 +++ .../clustermanager/ResourceManagerFactory.java | 37 ++ .../clustermanager/ResourceRequestState.java | 336 +++++++++++++++ .../clustermanager/SamzaApplicationState.java | 124 ++++++ .../SamzaContainerLaunchException.java | 44 ++ .../samza/clustermanager/SamzaResource.java | 78 ++++ .../clustermanager/SamzaResourceRequest.java | 123 ++++++ .../clustermanager/SamzaResourceStatus.java | 91 ++++ .../samza/config/ClusterManagerConfig.java | 203 +++++++++ .../apache/samza/container/LocalityManager.java | 2 +- .../grouper/task/TaskAssignmentManager.java | 2 +- .../apache/samza/storage/StorageRecovery.java | 4 +- .../samza/checkpoint/CheckpointTool.scala | 6 +- .../samza/coordinator/JobCoordinator.scala | 14 +- .../org/apache/samza/job/local/ProcessJob.scala | 4 +- .../samza/job/local/ProcessJobFactory.scala | 4 +- .../samza/job/local/ThreadJobFactory.scala | 4 +- .../ContainerProcessManagerMetrics.scala | 83 ++++ .../MockClusterResourceManager.java | 90 ++++ .../MockClusterResourceManagerCallback.java | 44 ++ .../clustermanager/MockContainerAllocator.java | 48 +++ .../clustermanager/MockContainerListener.java | 145 +++++++ .../MockContainerRequestState.java | 91 ++++ .../samza/clustermanager/MockHttpServer.java | 56 +++ .../clustermanager/TestContainerAllocator.java | 274 ++++++++++++ .../TestContainerProcessManager.java | 426 +++++++++++++++++++ .../TestContainerRequestState.java | 198 +++++++++ .../TestHostAwareContainerAllocator.java | 357 ++++++++++++++++ .../samza/container/TestSamzaContainer.scala | 14 +- .../samza/coordinator/TestJobCoordinator.scala | 9 +- .../apache/samza/job/local/TestProcessJob.scala | 8 +- .../samza/logging/log4j/StreamAppender.java | 6 +- samza-shell/src/main/bash/run-am.sh | 2 +- samza-shell/src/main/bash/run-jc.sh | 25 ++ .../apache/samza/job/yarn/SamzaAppState.java | 8 +- .../apache/samza/job/yarn/SamzaTaskManager.java | 2 +- .../org/apache/samza/job/yarn/YarnAppState.java | 146 +++++++ .../job/yarn/YarnClusterResourceManager.java | 402 +++++++++++++++++ .../samza/job/yarn/YarnContainerRunner.java | 258 +++++++++++ .../job/yarn/YarnResourceManagerFactory.java | 45 ++ .../samza/validation/YarnJobValidationTool.java | 7 +- .../apache/samza/job/yarn/SamzaAppMaster.scala | 4 +- .../job/yarn/SamzaYarnAppMasterLifecycle.scala | 90 ++++ .../job/yarn/SamzaYarnAppMasterService.scala | 80 ++++ .../job/yarn/TestContainerAllocatorCommon.java | 6 +- .../samza/job/yarn/TestSamzaTaskManager.java | 8 +- .../job/yarn/TestSamzaAppMasterLifecycle.scala | 4 +- .../job/yarn/TestSamzaAppMasterService.scala | 6 +- 55 files changed, 5255 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index fad7b55..7a09c7e 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -42,6 +42,23 @@ <allow class="org.apache.samza.Partition" /> </subpackage> + <subpackage name="clustermanager"> + <allow class="org.apache.samza.SamzaException" /> + <allow pkg="org.apache.samza.system" /> + <allow pkg="org.apache.samza.util" /> + <allow pkg="org.apache.samza.config" /> + <allow pkg="org.apache.samza.coordinator" /> + <allow pkg="org.apache.samza.job" /> + <allow pkg="org.apache.samza.metrics" /> + <allow pkg="org.apache.samza.serializers" /> + <allow pkg="org.apache.samza.container" /> + <allow pkg="org.eclipse.jetty.servlet" /> + + + <allow class="org.apache.samza.Partition" /> + </subpackage> + + <subpackage name="serializers"> <allow pkg="org.apache.samza.config" /> http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java new file mode 100644 index 0000000..097a476 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.ClusterManagerConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.TaskConfig; +import org.apache.samza.job.CommandBuilder; +import org.apache.samza.job.ShellCommandBuilder; +import org.apache.samza.util.Util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + + +/** + * {@link AbstractContainerAllocator} makes requests for physical resources to the resource manager and also runs + * a container process on an allocated physical resource. Sub-classes should override the assignResourceRequests() + * method to assign resource requests according to some strategy. + * + * See {@link ContainerAllocator} and {@link HostAwareContainerAllocator} for two such strategies + * + * This class is not thread-safe. + */ + +//This class is used in the refactored code path as called by run-jc.sh + +public abstract class AbstractContainerAllocator implements Runnable { + + private static final Logger log = LoggerFactory.getLogger(AbstractContainerAllocator.class); + + /* State that controls the lifecycle of the allocator thread*/ + private volatile boolean isRunning = true; + + /** + * Config and derived config objects + */ + private final TaskConfig taskConfig; + + private final Config config; + + /** + * A ClusterResourceManager for the allocator to request for resources. + */ + protected final ClusterResourceManager clusterResourceManager; + /** + * The allocator sleeps for allocatorSleepIntervalMs before it polls its queue for the next request + */ + protected final int allocatorSleepIntervalMs; + /** + * Each container currently has the same configuration - memory, and numCpuCores. + */ + protected final int containerMemoryMb; + protected final int containerNumCpuCores; + + /** + * State corresponding to num failed containers, running containers etc. + */ + protected final SamzaApplicationState state; + + /** + * ContainerRequestState indicates the state of all unfulfilled container requests and allocated containers + */ + protected final ResourceRequestState resourceRequestState; + + public AbstractContainerAllocator(ClusterResourceManager containerProcessManager, + ResourceRequestState resourceRequestState, + Config config, + SamzaApplicationState state) { + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + this.clusterResourceManager = containerProcessManager; + this.allocatorSleepIntervalMs = clusterManagerConfig.getAllocatorSleepTime(); + this.resourceRequestState = resourceRequestState; + this.containerMemoryMb = clusterManagerConfig.getContainerMemoryMb(); + this.containerNumCpuCores = clusterManagerConfig.getNumCores(); + this.taskConfig = new TaskConfig(config); + this.state = state; + this.config = config; + } + + /** + * Continually schedule StreamProcessors to run on resources obtained from the cluster manager. + * The loop frequency is governed by thread sleeps for allocatorSleepIntervalMs ms. + * + * Terminates when the isRunning flag is cleared. + */ + @Override + public void run() { + while (isRunning) { + try { + assignResourceRequests(); + // Release extra resources and update the entire system's state + resourceRequestState.releaseExtraResources(); + + Thread.sleep(allocatorSleepIntervalMs); + } catch (InterruptedException e) { + log.warn("Got InterruptedException in AllocatorThread.", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error("Got unknown Exception in AllocatorThread.", e); + } + } + } + + /** + * Assign resources from the cluster manager and matches them to run container processes on them. + * + */ + protected abstract void assignResourceRequests(); + + /** + * Updates the request state and runs a container process on the specified host. Assumes a resource + * is available on the preferred host, so the caller must verify that before invoking this method. + * + * @param request the {@link SamzaResourceRequest} which is being handled. + * @param preferredHost the preferred host on which the StreamProcessor process should be run or + * {@link ResourceRequestState#ANY_HOST} if there is no host preference. + * @throws + * SamzaException if there is no allocated resource in the specified host. + */ + protected void runStreamProcessor(SamzaResourceRequest request, String preferredHost) { + CommandBuilder builder = getCommandBuilder(request.getContainerID()); + // Get the available resource + SamzaResource resource = peekAllocatedResource(preferredHost); + if (resource == null) + throw new SamzaException("Expected resource was unavailable on host " + preferredHost); + + // Update state + resourceRequestState.updateStateAfterAssignment(request, preferredHost, resource); + int containerID = request.getContainerID(); + + //run container on resource + log.info("Found available resources on {}. Assigning request for container_id {} with " + + "timestamp {} to resource {}", + new Object[]{preferredHost, String.valueOf(containerID), request.getRequestTimestampMs(), resource.getResourceID()}); + try { + //launches a StreamProcessor on the resource + clusterResourceManager.launchStreamProcessor(resource, builder); + + if (state.neededResources.decrementAndGet() == 0) { + state.jobHealthy.set(true); + } + state.runningContainers.put(request.getContainerID(), resource); + + } catch (SamzaContainerLaunchException e) { + log.warn(String.format("Got exception while starting resource %s. Requesting a new resource on any host", resource), e); + resourceRequestState.releaseUnstartableContainer(resource); + requestResource(containerID, ResourceRequestState.ANY_HOST); + } + } + + /** + * Called during initial request for resources + * + * @param resourceToHostMappings A Map of [containerId, hostName] containerId is the ID of the container process + * to run on the resource. hostName is the host on which the resource must be allocated. + * The hostName value is null, either + * - when host-affinity is not enabled, or + * - when host-affinity is enabled and job is run for the first time + */ + public void requestResources(Map<Integer, String> resourceToHostMappings) { + for (Map.Entry<Integer, String> entry : resourceToHostMappings.entrySet()) { + int containerId = entry.getKey(); + String preferredHost = entry.getValue(); + if (preferredHost == null) + preferredHost = ResourceRequestState.ANY_HOST; + + requestResource(containerId, preferredHost); + } + } + + /** + * Checks if this allocator has a pending resource request. + * @return {@code true} if there is a pending request, {@code false} otherwise. + */ + protected final boolean hasPendingRequest() { + return peekPendingRequest() != null; + } + + /** + * Retrieves, but does not remove, the next pending request in the queue. + * + * @return the pending request or {@code null} if there is no pending request. + */ + protected final SamzaResourceRequest peekPendingRequest() { + return resourceRequestState.peekPendingRequest(); + } + + /** + * Method to request a resource from the cluster manager + * + * @param containerID Identifier of the container that will be run when a resource is allocated for + * this request + * @param preferredHost Name of the host that you prefer to run the container on + */ + public final void requestResource(int containerID, String preferredHost) { + SamzaResourceRequest request = new SamzaResourceRequest(this.containerNumCpuCores, this.containerMemoryMb, + preferredHost, containerID); + resourceRequestState.addResourceRequest(request); + state.containerRequests.incrementAndGet(); + } + + /** + * Returns true if there are resources allocated on a host. + * @param host the host for which a resource is needed. + * @return {@code true} if there is a resource allocated for the specified host, {@code false} otherwise. + */ + protected boolean hasAllocatedResource(String host) { + return peekAllocatedResource(host) != null; + } + + /** + * Retrieves, but does not remove, the first allocated resource on the specified host. + * + * @param host the host on which a resource is needed. + * @return the first {@link SamzaResource} allocated for the specified host or {@code null} if there isn't one. + */ + protected SamzaResource peekAllocatedResource(String host) { + return resourceRequestState.peekResource(host); + } + + /** + * Returns a command builder with the build environment configured with the containerId. + * @param samzaContainerId to configure the builder with. + * @return the constructed builder object + */ + private CommandBuilder getCommandBuilder(int samzaContainerId) { + String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName()); + CommandBuilder cmdBuilder = (CommandBuilder) Util.getObj(cmdBuilderClassName); + + cmdBuilder.setConfig(config).setId(samzaContainerId).setUrl(state.jobModelManager.server().getUrl()); + return cmdBuilder; + } + /** + * Adds allocated samzaResource to a synchronized buffer of allocated resources. + * See allocatedResources in {@link ResourceRequestState} + * + * @param samzaResource returned by the ContainerManager + */ + public final void addResource(SamzaResource samzaResource) { + resourceRequestState.addResource(samzaResource); + } + + /** + * Stops the Allocator. Setting this flag to false, exits the allocator loop. + */ + public void stop() { + isRunning = false; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java new file mode 100644 index 0000000..d0d4e34 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.ClusterManagerConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.ShellCommandConfig; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.metrics.JmxServer; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Implements a JobCoordinator that is completely independent of the underlying cluster + * manager system. This {@link ClusterBasedJobCoordinator} handles functionality common + * to both Yarn and Mesos. It takes care of + * 1. Requesting resources from an underlying {@link ClusterResourceManager}. + * 2. Ensuring that placement of containers to resources happens (as per whether host affinity + * is configured or not). + * + * Any offer based cluster management system that must integrate with Samza will merely + * implement a {@link ResourceManagerFactory} and a {@link ClusterResourceManager}. + * + * This class is not thread-safe. For safe access in multi-threaded context, invocations + * should be synchronized by the callers. + * + * TODO: + * 1. Refactor ClusterResourceManager to also handle process liveness, process start + * callbacks + * 2. Refactor the JobModelReader to be an interface. + * 3. Make ClusterBasedJobCoordinator implement the JobCoordinator API as in SAMZA-881. + * 4. Refactor UI state variables. + * 5. Unit tests. + * 6. Document newly added configs. + */ +public class ClusterBasedJobCoordinator { + + private static final Logger log = LoggerFactory.getLogger(ClusterBasedJobCoordinator.class); + + private final Config config; + + private final ClusterManagerConfig clusterManagerConfig; + + /** + * State to track container failures, host-container mappings + */ + private final SamzaApplicationState state; + + /** + * Metrics to track stats around container failures, needed containers etc. + */ + + //even though some of these can be converted to local variables, it will not be the case + //as we add more methods to the JobCoordinator and completely implement SAMZA-881. + + /** + * Handles callback for allocated containers, failed containers. + */ + private final ContainerProcessManager containerProcessManager; + + /** + * A JobModelManager to return and refresh the {@link org.apache.samza.job.model.JobModel} when required. + */ + private final JobModelManager jobModelManager; + + /* + * The interval for polling the Task Manager for shutdown. + */ + private final long jobCoordinatorSleepInterval; + + /* + * Config specifies if a Jmx server should be started on this Job Coordinator + */ + private final boolean isJmxEnabled; + + /** + * Internal boolean to check if the job coordinator has already been started. + */ + private final AtomicBoolean isStarted = new AtomicBoolean(false); + + private JmxServer jmxServer; + + + /** + * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually + * run the jobcoordinator. + * + * @param coordinatorSystemConfig the coordinator stream config that can be used to read the + * {@link org.apache.samza.job.model.JobModel} from. + */ + public ClusterBasedJobCoordinator(Config coordinatorSystemConfig) { + + MetricsRegistryMap registry = new MetricsRegistryMap(); + + //build a JobModelReader and perform partition assignments. + jobModelManager = buildJobModelManager(coordinatorSystemConfig, registry); + config = jobModelManager.jobModel().getConfig(); + state = new SamzaApplicationState(jobModelManager); + clusterManagerConfig = new ClusterManagerConfig(config); + isJmxEnabled = clusterManagerConfig.getJmxEnabled(); + + jobCoordinatorSleepInterval = clusterManagerConfig.getJobCoordinatorSleepInterval(); + + // build a container process Manager + containerProcessManager = new ContainerProcessManager(config, state, registry); + } + + + /** + * Starts the JobCoordinator. + * + */ + public void run() { + if (!isStarted.compareAndSet(false, true)) { + log.info("Attempting to start an already started job coordinator. "); + return; + } + // set up JmxServer (if jmx is enabled) + if (isJmxEnabled) { + jmxServer = new JmxServer(); + state.jmxUrl = jmxServer.getJmxUrl(); + state.jmxTunnelingUrl = jmxServer.getTunnelingJmxUrl(); + } else { + jmxServer = null; + } + + try { + //initialize JobCoordinator state + log.info("Starting Cluster Based Job Coordinator"); + + containerProcessManager.start(); + + boolean isInterrupted = false; + + while (!containerProcessManager.shouldShutdown() && !isInterrupted) { + try { + Thread.sleep(jobCoordinatorSleepInterval); + } catch (InterruptedException e) { + isInterrupted = true; + log.error("Interrupted in job coordinator loop {} ", e); + Thread.currentThread().interrupt(); + } + } + } catch (Throwable e) { + log.error("Exception thrown in the JobCoordinator loop {} ", e); + throw new SamzaException(e); + } finally { + onShutDown(); + } + } + + /** + * Stops all components of the JobCoordinator. + */ + private void onShutDown() { + + if (containerProcessManager != null) { + try { + containerProcessManager.stop(); + } catch (Throwable e) { + log.error("Exception while stopping task manager {}", e); + } + log.info("Stopped task manager"); + } + + if (jmxServer != null) { + try { + jmxServer.stop(); + log.info("Stopped Jmx Server"); + } catch (Throwable e) { + log.error("Exception while stopping jmx server {}", e); + } + } + } + + private JobModelManager buildJobModelManager(Config coordinatorSystemConfig, MetricsRegistryMap registry) { + JobModelManager jobModelManager = JobModelManager.apply(coordinatorSystemConfig, registry); + return jobModelManager; + } + + + + /** + * The entry point for the {@link ClusterBasedJobCoordinator} + * @param args args + */ + public static void main(String[] args) { + Config coordinatorSystemConfig = null; + final String coordinatorSystemEnv = System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()); + try { + //Read and parse the coordinator system config. + log.info("Parsing coordinator system config {}", coordinatorSystemEnv); + coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class)); + } catch (IOException e) { + log.error("Exception while reading coordinator stream config {}", e); + throw new SamzaException(e); + } + ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig); + jc.run(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java new file mode 100644 index 0000000..715cf66 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +import org.apache.samza.job.CommandBuilder; + +import java.util.List; + +/** + * <code>ClusterResourceManager</code> handles communication with a cluster manager + * and provides updates on events such as resource allocations and + * completions. Any offer-based resource management system that integrates with Samza + * will provide an implementation of a ClusterResourceManager API. + * + * This class is meant to be used by implementing a CallbackHandler: + * <pre> + * {@code + * class MyCallbackHandler implements ClusterResourceManager.CallbackHandler { + * public void onResourcesAvailable(List<SamzaResource> resources) { + * [launch a streamprocessor on the resources] + * } + * + * public void onResourcesCompleted(List<SamzaResourceStatus> resourceStatus) { + * [check for exit code to examine diagnostics, and take actions] + * } + * + * public void onError(Throwable error) { + * [stop the container process manager] + * } + * + * } + * } + * </pre> + * + * The lifecycle of a ClusterResourceManager should be managed similarly to the following: + * + * <pre> + * {@code + * ClusterResourceManager processManager = + * new ClusterResourceManager(callback); + * processManager.start(); + * [... request resources ...] + * [... wait for application to complete ...] + * processManager.stop(); + * } + * </pre> + */ + + +/*** + * TODO: + * 1.Investigate what it means to kill a StreamProcessor, and add it as an API here. + * 2.Consider an API for Container Process liveness - ie, to be notified when a StreamProcessor + * joins or leaves the group. Will evolve more as we implement standalone and mesos. + */ + +public abstract class ClusterResourceManager { + + protected final Callback clusterManagerCallback; + + public ClusterResourceManager(Callback callback) { + this.clusterManagerCallback = callback; + } + + public abstract void start(); + + /*** + * Request resources for running container processes + * @param resourceRequest the resourceRequest being made + */ + public abstract void requestResources(SamzaResourceRequest resourceRequest); + + /*** + * Remove a previously submitted resource request. The previous resource request may + * have been submitted to the cluster manager. Even after the remove request, a ContainerProcessManagerCallback + * implementation must be prepared to receive an allocation for the previous request. + * This is merely a best effort cancellation. + * + * @param request, the resource request that must be cancelled + */ + public abstract void cancelResourceRequest(SamzaResourceRequest request); + + + /*** + * If the app cannot use the resource or wants to give up the resource, it can release them. + * @param resource the resource to be released + */ + public abstract void releaseResources(SamzaResource resource); + + /*** + * Requests the launch of a StreamProcessor with the specified context on the resource. + * @param resource the specified resource + * @param builder A builder implementation that encapsulates the context for the + * StreamProcessor. A builder encapsulates the ID for the processor, the + * build environment, the command to execute etc. + * @throws SamzaContainerLaunchException when there's an error during the requesting launch. + * + */ + public abstract void launchStreamProcessor(SamzaResource resource, CommandBuilder builder) throws SamzaContainerLaunchException; + + + public abstract void stop(SamzaApplicationState.SamzaAppStatus status); + + + /*** + *Defines a callback interface for interacting with notifications from a ClusterResourceManager + */ + public interface Callback { + + /*** + * This callback is invoked when there are resources that are to be offered to the application. + * Often, resources that an app requests may not be available. The application must be prepared + * to handle callbacks for resources that it did not request. + * @param resources that are offered to the application + */ + void onResourcesAvailable(List<SamzaResource> resources); + + /*** + * This callback is invoked when resources are no longer available to the application. A + * resource could be marked 'completed' in scenarios like - failure of disk on the host, + * pre-emption of the resource to run another StreamProcessor, exit or termination of the + * StreamProcessor running in the resource. + * + * The SamzaResourceStatus contains diagnostics on why the failure occured + * @param resources statuses for the resources that were completed. + */ + void onResourcesCompleted(List<SamzaResourceStatus> resources); + + /*** + * This callback is invoked when there is an error in the ClusterResourceManager. This is + * guaranteed to be invoked when there is an uncaught exception in any other + * ClusterResourceManager callbacks. + * @param e the underlying Throwable was thrown. + */ + void onError(Throwable e); + } +} + + + + http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java new file mode 100644 index 0000000..5997a7a --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import org.apache.samza.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is the default allocator that will be used by ContainerProcessManager. + * + * When host-affinity is not enabled, this periodically wakes up to assign a container to *ANY* allocated resource. + * If there aren't enough containers, it waits by sleeping for {@code allocatorSleepIntervalMs} milliseconds. + */ +//This class is used in the refactored code path as called by run-jc.sh + +public class ContainerAllocator extends AbstractContainerAllocator { + private static final Logger log = LoggerFactory.getLogger(ContainerAllocator.class); + + public ContainerAllocator(ClusterResourceManager manager, + Config config, SamzaApplicationState state) { + super(manager, new ResourceRequestState(false, manager), config, state); + } + + /** + * During the run() method, the thread sleeps for allocatorSleepIntervalMs ms. It then invokes assignResourceRequests, + * and tries to allocate any unsatisfied request that is still in the request queue {@link ResourceRequestState}) + * with allocated resources, if any. + * + * Since host-affinity is not enabled, all allocated resources are buffered in the list keyed by "ANY_HOST". + * */ + @Override + public void assignResourceRequests() { + while (hasPendingRequest() && hasAllocatedResource(ResourceRequestState.ANY_HOST)) { + SamzaResourceRequest request = peekPendingRequest(); + runStreamProcessor(request, ResourceRequestState.ANY_HOST); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java new file mode 100644 index 0000000..f2db34c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import org.apache.samza.SamzaException; +import org.apache.samza.config.ClusterManagerConfig; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.coordinator.JobModelManager; +import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; +import org.apache.samza.metrics.ContainerProcessManagerMetrics; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * ContainerProcessManager is responsible for requesting containers, handling failures, and notifying the application master that the + * job is done. + * + * The following main threads are involved in the execution of the ContainerProcessManager : + * - The main thread (defined in SamzaAppMaster) that sends requests to the cluster manager. + * - The callback handler thread that receives the responses from cluster manager and handles: + * - Populating a buffer when a container is allocated by the cluster manager + * (allocatedContainers in {@link ResourceRequestState} + * - Identifying the cause of container failure and re-request containers from the cluster manager by adding request to the + * internal requestQueue in {@link ResourceRequestState} + * - The allocator thread defined here assigns the allocated containers to pending requests + * (See {@link org.apache.samza.clustermanager.ContainerAllocator} or {@link org.apache.samza.clustermanager.HostAwareContainerAllocator}) + * + */ + +public class ContainerProcessManager implements ClusterResourceManager.Callback { + + private static final Logger log = LoggerFactory.getLogger(ContainerProcessManager.class); + /** + * Does this Samza Job need hostAffinity when containers are allocated. + */ + private final boolean hostAffinityEnabled; + + /** + * State variables tracking containers allocated, freed, running, released. + */ + private final SamzaApplicationState state; + + /** + * Config for this Samza job + */ + private final ClusterManagerConfig clusterManagerConfig; + private final JobConfig jobConfig; + + /** + * The Allocator matches requests to resources and executes processes. + */ + private final AbstractContainerAllocator containerAllocator; + private final Thread allocatorThread; + + /** + * A standard interface to request resources. + */ + private final ClusterResourceManager clusterResourceManager; + + /** + * If there are too many failed container failures (configured by job.container.retry.count) for a + * container, the job exits. + */ + private volatile boolean tooManyFailedContainers = false; + + private volatile Throwable exceptionOccurred = null; + + + /** + * A map that keeps track of how many times each container failed. The key is the container ID, and the + * value is the {@link ResourceFailure} object that has a count of failures. + * + */ + private final Map<Integer, ResourceFailure> containerFailures = new HashMap<>(); + + private final ContainerProcessManagerMetrics metrics; + + + public ContainerProcessManager(Config config, + SamzaApplicationState state, + MetricsRegistryMap registry) { + this.state = state; + this.clusterManagerConfig = new ClusterManagerConfig(config); + this.jobConfig = new JobConfig(config); + + this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled(); + + ResourceManagerFactory factory = getContainerProcessManagerFactory(clusterManagerConfig); + this.clusterResourceManager = factory.getClusterResourceManager(this, state); + this.metrics = new ContainerProcessManagerMetrics(config, state, registry); + + if (this.hostAffinityEnabled) { + this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, state); + } else { + this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state); + } + + this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); + log.info("finished initialization of samza task manager"); + + } + + //package private, used only in tests + ContainerProcessManager(Config config, + SamzaApplicationState state, + MetricsRegistryMap registry, + ClusterResourceManager resourceManager) { + JobModelManager jobModelManager = state.jobModelManager; + this.state = state; + this.clusterManagerConfig = new ClusterManagerConfig(config); + this.jobConfig = new JobConfig(config); + + this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled(); + + this.clusterResourceManager = resourceManager; + this.metrics = new ContainerProcessManagerMetrics(config, state, registry); + + + if (this.hostAffinityEnabled) { + this.containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, clusterManagerConfig.getContainerRequestTimeout(), config, state); + } else { + this.containerAllocator = new ContainerAllocator(clusterResourceManager, config, state); + } + + this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); + log.info("finished initialization of samza task manager"); + + } + + public boolean shouldShutdown() { + log.info(" TaskManager state: Too many FailedContainers: {} No. Completed containers: {} Num Configured containers: {}" + + " AllocatorThread liveness: {} ", new Object[]{tooManyFailedContainers, state.completedContainers.get(), state.containerCount, allocatorThread.isAlive()}); + + if (exceptionOccurred != null) { + log.error("Exception in ContainerProcessManager", exceptionOccurred); + throw new SamzaException(exceptionOccurred); + } + return tooManyFailedContainers || state.completedContainers.get() == state.containerCount.get() || !allocatorThread.isAlive(); + } + + public void start() { + metrics.start(); + + log.info("Starting Container Process Manager"); + clusterResourceManager.start(); + + log.info("Starting the Samza task manager"); + final int containerCount = jobConfig.getContainerCount(); + + state.containerCount.set(containerCount); + state.neededResources.set(containerCount); + + // Request initial set of containers + Map<Integer, String> containerToHostMapping = state.jobModelManager.jobModel().getAllContainerLocality(); + + containerAllocator.requestResources(containerToHostMapping); + + // Start container allocator thread + log.info("Starting the container allocator thread"); + allocatorThread.start(); + } + + public void stop() { + log.info("Invoked stop of the Samza container process manager"); + + // Shutdown allocator thread + containerAllocator.stop(); + try { + allocatorThread.join(); + } catch (InterruptedException ie) { + log.error("Allocator Thread join() threw an interrupted exception", ie); + Thread.currentThread().interrupt(); + } + + if (metrics != null) { + try { + metrics.stop(); + } catch (Throwable e) { + log.error("Exception while stopping metrics {}", e); + } + log.info("Stopped metrics reporters"); + } + + if (clusterResourceManager != null) { + try { + clusterResourceManager.stop(state.status); + } catch (Throwable e) { + log.error("Exception while stopping cluster resource manager {}", e); + } + log.info("Stopped cluster resource manager"); + } + + log.info("Finished stop of Container process manager"); + + } + + public void onResourceAllocated(SamzaResource container) { + log.info("Container allocated from RM on " + container.getHost()); + containerAllocator.addResource(container); + } + + /** + * This methods handles the onResourceCompleted callback from the RM. Based on the ContainerExitStatus, it decides + * whether a container that exited is marked as complete or failure. + * @param containerStatus of the resource that completed + */ + public void onResourceCompleted(SamzaResourceStatus containerStatus) { + String containerIdStr = containerStatus.getResourceID(); + int containerId = -1; + for (Map.Entry<Integer, SamzaResource> entry: state.runningContainers.entrySet()) { + if (entry.getValue().getResourceID().equals(containerStatus.getResourceID())) { + log.info("Matching container ID found " + entry.getKey() + " " + entry.getValue()); + + containerId = entry.getKey(); + break; + } + } + if (containerId == -1) { + log.info("No matching container id found for " + containerStatus.toString()); + } + state.runningContainers.remove(containerId); + + int exitStatus = containerStatus.getExitCode(); + switch (exitStatus) { + case SamzaResourceStatus.SUCCESS: + log.info("Container {} completed successfully.", containerIdStr); + + state.completedContainers.incrementAndGet(); + + if (containerId != -1) { + state.finishedContainers.add(containerId); + containerFailures.remove(containerId); + } + + if (state.completedContainers.get() == state.containerCount.get()) { + log.info("Setting job status to SUCCEEDED, since all containers have been marked as completed."); + state.status = SamzaApplicationState.SamzaAppStatus.SUCCEEDED; + } + break; + + case SamzaResourceStatus.DISK_FAIL: + case SamzaResourceStatus.ABORTED: + case SamzaResourceStatus.PREEMPTED: + log.info("Got an exit code of {}. This means that container {} was " + + "killed by YARN, either due to being released by the application " + + "master or being 'lost' due to node failures etc. or due to preemption by the RM", + exitStatus, + containerIdStr); + + state.releasedContainers.incrementAndGet(); + + // If this container was assigned some partitions (a containerId), then + // clean up, and request a refactor container for the tasks. This only + // should happen if the container was 'lost' due to node failure, not + // if the AM released the container. + if (containerId != -1) { + log.info("Released container {} was assigned task group ID {}. Requesting a refactor container for the task group.", containerIdStr, containerId); + + state.neededResources.incrementAndGet(); + state.jobHealthy.set(false); + + // request a container on refactor host + containerAllocator.requestResource(containerId, ResourceRequestState.ANY_HOST); + } + break; + + default: + // TODO: Handle failure more intelligently. Should track NodeFailures! + log.info("Container failed for some reason. Let's start it again"); + log.info("Container " + containerIdStr + " failed with exit code . " + exitStatus + " - " + containerStatus.getDiagnostics() + " containerID is " + containerId); + + state.failedContainers.incrementAndGet(); + state.failedContainersStatus.put(containerIdStr, containerStatus); + state.jobHealthy.set(false); + + if (containerId != -1) { + state.neededResources.incrementAndGet(); + // Find out previously running container location + String lastSeenOn = state.jobModelManager.jobModel().getContainerToHostValue(containerId, SetContainerHostMapping.HOST_KEY); + if (!hostAffinityEnabled || lastSeenOn == null) { + lastSeenOn = ResourceRequestState.ANY_HOST; + } + log.info("Container was last seen on " + lastSeenOn); + // A container failed for an unknown reason. Let's check to see if + // we need to shutdown the whole app master if too many container + // failures have happened. The rules for failing are that the + // failure count for a task group id must be > the configured retry + // count, and the last failure (the one prior to this one) must have + // happened less than retry window ms ago. If retry count is set to + // 0, the app master will fail on any container failure. If the + // retry count is set to a number < 0, a container failure will + // never trigger an app master failure. + int retryCount = clusterManagerConfig.getContainerRetryCount(); + int retryWindowMs = clusterManagerConfig.getContainerRetryWindowMs(); + + if (retryCount == 0) { + log.error("Container ID {} ({}) failed, and retry count is set to 0, so shutting down the application master, and marking the job as failed.", containerId, containerIdStr); + + tooManyFailedContainers = true; + } else if (retryCount > 0) { + int currentFailCount; + long lastFailureTime; + if (containerFailures.containsKey(containerId)) { + ResourceFailure failure = containerFailures.get(containerId); + currentFailCount = failure.getCount() + 1; + lastFailureTime = failure.getLastFailure(); + } else { + currentFailCount = 1; + lastFailureTime = 0L; + } + if (currentFailCount >= retryCount) { + long lastFailureMsDiff = System.currentTimeMillis() - lastFailureTime; + + if (lastFailureMsDiff < retryWindowMs) { + log.error("Container ID " + containerId + "(" + containerIdStr + ") has failed " + currentFailCount + + " times, with last failure " + lastFailureMsDiff + "ms ago. This is greater than retry count of " + + retryCount + " and window of " + retryWindowMs + "ms , so shutting down the application master, and marking the job as failed."); + + // We have too many failures, and we're within the window + // boundary, so reset shut down the app master. + tooManyFailedContainers = true; + state.status = SamzaApplicationState.SamzaAppStatus.FAILED; + } else { + log.info("Resetting fail count for container ID {} back to 1, since last container failure ({}) for " + + "this container ID was outside the bounds of the retry window.", containerId, containerIdStr); + + // Reset counter back to 1, since the last failure for this + // container happened outside the window boundary. + containerFailures.put(containerId, new ResourceFailure(1, System.currentTimeMillis())); + } + } else { + log.info("Current fail count for container ID {} is {}.", containerId, currentFailCount); + containerFailures.put(containerId, new ResourceFailure(currentFailCount, System.currentTimeMillis())); + } + } + + if (!tooManyFailedContainers) { + log.info("Requesting a refactor container "); + // Request a refactor container + containerAllocator.requestResource(containerId, lastSeenOn); + } + } + + } + } + + @Override + public void onResourcesAvailable(List<SamzaResource> resources) { + for (SamzaResource resource : resources) { + onResourceAllocated(resource); + } + + } + + @Override + public void onResourcesCompleted(List<SamzaResourceStatus> resourceStatuses) { + for (SamzaResourceStatus resourceStatus : resourceStatuses) { + onResourceCompleted(resourceStatus); + } + } + + /** + * An error in the callback terminates the JobCoordinator + * @param e the underlying exception/error + */ + @Override + public void onError(Throwable e) { + log.error("Exception occured in callbacks in the Container Manager : {}", e); + exceptionOccurred = e; + } + + /** + * Returns an instantiated {@link ResourceManagerFactory} from a {@link ClusterManagerConfig}. The + * {@link ResourceManagerFactory} is used to return an implementation of a {@link ClusterResourceManager} + * + * @param clusterManagerConfig, the cluster manager config to parse. + * + */ + private ResourceManagerFactory getContainerProcessManagerFactory(final ClusterManagerConfig clusterManagerConfig) { + final String containerManagerFactoryClass = clusterManagerConfig.getContainerManagerClass(); + final ResourceManagerFactory factory; + + try { + factory = (ResourceManagerFactory) Class.forName(containerManagerFactoryClass).newInstance(); + } catch (InstantiationException e) { + log.error("Instantiation exception when creating ContainerManager", e); + throw new SamzaException(e); + } catch (IllegalAccessException e) { + log.error("Illegal access exception when creating ContainerManager", e); + throw new SamzaException(e); + } catch (ClassNotFoundException e) { + log.error("ClassNotFound Exception when creating ContainerManager", e); + throw new SamzaException(e); + } + return factory; + } + + + +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java new file mode 100644 index 0000000..da73049 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import org.apache.samza.config.Config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is the allocator thread that will be used by ContainerProcessManager when host-affinity is enabled for a job. It is similar + * to {@link ContainerAllocator}, except that it considers locality for allocation. + * + * In case of host-affinity, each request ({@link SamzaResourceRequest} encapsulates the identifier of the container + * to be run and a "preferredHost". preferredHost is determined by the locality mappings in the coordinator stream. + * This thread periodically wakes up and makes the best-effort to assign a container to the preferredHost. If the + * preferredHost is not returned by the cluster manager before the corresponding container expires, the thread + * assigns the container to any other host that is allocated next. + * + * The container expiry is determined by CONTAINER_REQUEST_TIMEOUT and is configurable on a per-job basis. + * + * If there aren't enough containers, it waits by sleeping for allocatorSleepIntervalMs milliseconds. + */ +//This class is used in the refactored code path as called by run-jc.sh + +public class HostAwareContainerAllocator extends AbstractContainerAllocator { + private static final Logger log = LoggerFactory.getLogger(HostAwareContainerAllocator.class); + /** + * Tracks the expiration of a request for resources. + */ + private final int requestTimeout; + + public HostAwareContainerAllocator(ClusterResourceManager manager , + int timeout, Config config, SamzaApplicationState state) { + super(manager, new ResourceRequestState(true, manager), config, state); + this.requestTimeout = timeout; + } + + /** + * Since host-affinity is enabled, all allocated resources are buffered in the list keyed by "preferredHost". + * + * If the requested host is not available, the thread checks to see if the request has expired. + * If it has expired, it runs the container with expectedContainerID on one of the available resources from the + * allocatedContainers buffer keyed by "ANY_HOST". + */ + @Override + public void assignResourceRequests() { + while (hasPendingRequest()) { + SamzaResourceRequest request = peekPendingRequest(); + log.info("Handling request: " + request.getContainerID() + " " + request.getRequestTimestampMs() + " " + request.getPreferredHost()); + String preferredHost = request.getPreferredHost(); + int containerID = request.getContainerID(); + + if (hasAllocatedResource(preferredHost)) { + // Found allocated container at preferredHost + log.info("Found a matched-container {} on the preferred host. Running on {}", containerID, preferredHost); + runStreamProcessor(request, preferredHost); + state.matchedResourceRequests.incrementAndGet(); + } else { + log.info("Did not find any allocated resources on preferred host {} for running container id {}", + preferredHost, containerID); + + boolean expired = requestExpired(request); + boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST); + + if (expired && resourceAvailableOnAnyHost) { + log.info("Request expired. running on ANY_HOST"); + runStreamProcessor(request, ResourceRequestState.ANY_HOST); + } else { + log.info("Either the request timestamp {} is greater than resource request timeout {}ms or we couldn't " + + "find any free allocated resources in the buffer. Breaking out of loop.", + request.getRequestTimestampMs(), requestTimeout); + break; + } + } + } + } + + /** + * Checks if a request has expired. + * @param request + * @return + */ + private boolean requestExpired(SamzaResourceRequest request) { + long currTime = System.currentTimeMillis(); + boolean requestExpired = currTime - request.getRequestTimestampMs() > requestTimeout; + if (requestExpired) { + log.info("Request {} with currTime {} has expired", request, currTime); + } + return requestExpired; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java new file mode 100644 index 0000000..36e4b6f --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +/** + * A ResourceFailure instance encapsulates information related to a resource failure. + * It keeps track of the time of the last failure, the number of failures. + * */ +public class ResourceFailure { + /** + * Number of times a container has failed + * */ + private final int count; + /** + * Latest failure time of the container + * */ + private final Long lastFailure; + + public ResourceFailure(int count, + Long lastFailure) { + this.count = count; + this.lastFailure = lastFailure; + } + + public int getCount() { + return count; + } + + public Long getLastFailure() { + return lastFailure; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceManagerFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceManagerFactory.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceManagerFactory.java new file mode 100644 index 0000000..fc5636d --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceManagerFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + + +/** + * A factory to build a {@link ClusterResourceManager} + * //TODO: move the class to Samza-API? + */ +public interface ResourceManagerFactory { + /** + * Return a {@link ClusterResourceManager } + * @param callback to be registered with the {@link ClusterResourceManager} + * @param state Useful if the ClusterResourceManager wants to host an UI. + * //TODO: Remove the SamzaAppState param and refactor into a smaller focussed class. + * //TODO: Investigate the possibility a common Samza UI for all cluster managers - Yarn,Mesos,Standalone + * @return the instantiated {@link ClusterResourceManager} + */ + public ClusterResourceManager getClusterResourceManager(ClusterResourceManager.Callback callback, SamzaApplicationState state); +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java new file mode 100644 index 0000000..39897c7 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * {@link ResourceRequestState} maintains the state variables for all the resource requests and the allocated resources returned + * by the cluster manager. + * + * This class is thread-safe, and can safely support concurrent accesses without any form of external synchronization. Currently, + * this state is shared across both the Allocator Thread, and the Callback handler thread. + * + */ +public class ResourceRequestState { + private static final Logger log = LoggerFactory.getLogger(ResourceRequestState.class); + public static final String ANY_HOST = "ANY_HOST"; + + /** + * Maintain a map of hostname to a list of resources allocated on this host + */ + private final Map<String, List<SamzaResource>> allocatedResources = new HashMap<>(); + /** + * Represents the queue of resource requests made by the {@link ContainerProcessManager} + */ + private final PriorityQueue<SamzaResourceRequest> requestsQueue = new PriorityQueue<SamzaResourceRequest>(); + /** + * Maintain a map of hostname to the number of requests made for resources on this host + * This state variable is used to look-up whether an allocated resource on a host was ever requested in the past. + * This map is not updated when host-affinity is not enabled + */ + private final Map<String, AtomicInteger> requestsToCountMap = new HashMap<>(); + /** + * Indicates whether host-affinity is enabled or not + */ + private final boolean hostAffinityEnabled; + + private final ClusterResourceManager manager; + + private final Object lock = new Object(); + + public ResourceRequestState(boolean hostAffinityEnabled, ClusterResourceManager manager) { + this.hostAffinityEnabled = hostAffinityEnabled; + this.manager = manager; + } + + /** + * Enqueues a {@link SamzaResourceRequest} to be sent to a {@link ClusterResourceManager}. + * + * @param request {@link SamzaResourceRequest} to be queued + */ + public void addResourceRequest(SamzaResourceRequest request) { + synchronized (lock) { + requestsQueue.add(request); + String preferredHost = request.getPreferredHost(); + + // if host affinity is enabled, update state. + if (hostAffinityEnabled) { + //increment # of requests on the host. + if (requestsToCountMap.containsKey(preferredHost)) { + requestsToCountMap.get(preferredHost).incrementAndGet(); + } else { + requestsToCountMap.put(preferredHost, new AtomicInteger(1)); + } + /** + * The following is important to correlate allocated resource data with the requestsQueue made before. If + * the preferredHost is requested for the first time, the state should reflect that the allocatedResources + * list is empty and NOT null. + */ + + if (!allocatedResources.containsKey(preferredHost)) { + allocatedResources.put(preferredHost, new ArrayList<SamzaResource>()); + } + } + manager.requestResources(request); + } + } + + /** + * Invoked each time a resource is returned from a {@link ClusterResourceManager}. + * @param samzaResource The resource that was returned from the {@link ClusterResourceManager} + */ + public void addResource(SamzaResource samzaResource) { + synchronized (lock) { + if (hostAffinityEnabled) { + String hostName = samzaResource.getHost(); + AtomicInteger requestCount = requestsToCountMap.get(hostName); + // Check if this host was requested for any of the resources + if (requestCount == null || requestCount.get() == 0) { + log.info( + " This host was not requested. {} saving the samzaResource {} in the buffer for ANY_HOST", + hostName, + samzaResource.getResourceID() + ); + addToAllocatedResourceList(ANY_HOST, samzaResource); + } else { + // This host was indeed requested. + int requestCountOnThisHost = requestCount.get(); + List<SamzaResource> allocatedResourcesOnThisHost = allocatedResources.get(hostName); + if (requestCountOnThisHost > 0) { + //there are pending requests for resources on this host. + if (allocatedResourcesOnThisHost == null || allocatedResourcesOnThisHost.size() < requestCountOnThisHost) { + log.info("Got matched samzaResource {} in the buffer for preferredHost: {}", samzaResource.getResourceID(), hostName); + addToAllocatedResourceList(hostName, samzaResource); + } else { + /** + * The RM may allocate more containers on a given host than requested. In such a case, even though the + * requestCount != 0, it will be greater than the total request count for that host. Hence, it should be + * assigned to ANY_HOST + */ + log.info( + "The number of containers already allocated on {} is greater than what was " + + "requested, which is {}. Hence, saving the samzaResource {} in the buffer for ANY_HOST", + new Object[]{ + hostName, + requestCountOnThisHost, + samzaResource.getResourceID() + } + ); + addToAllocatedResourceList(ANY_HOST, samzaResource); + } + } + } + } else { + log.info("Host affinity not enabled. Saving the samzaResource {} in the buffer for ANY_HOST", samzaResource.getResourceID()); + addToAllocatedResourceList(ANY_HOST, samzaResource); + } + } + } + + // Appends a samzaResource to the list of allocated resources + private void addToAllocatedResourceList(String host, SamzaResource samzaResource) { + List<SamzaResource> samzaResources = allocatedResources.get(host); + if (samzaResources != null) { + samzaResources.add(samzaResource); + } else { + samzaResources = new ArrayList<SamzaResource>(); + samzaResources.add(samzaResource); + allocatedResources.put(host, samzaResources); + } + } + + /** + * This method updates the state after a request is fulfilled and a resource starts running on a host + * Needs to be synchronized because the state buffers are populated by the AMRMCallbackHandler, whereas it is + * drained by the allocator thread + * + * @param request {@link SamzaResourceRequest} that was fulfilled + * @param assignedHost Host to which the samzaResource was assigned + * @param samzaResource Allocated samzaResource resource that was used to satisfy this request + */ + public void updateStateAfterAssignment(SamzaResourceRequest request, String assignedHost, SamzaResource samzaResource) { + synchronized (lock) { + requestsQueue.remove(request); + allocatedResources.get(assignedHost).remove(samzaResource); + if (hostAffinityEnabled) { + // assignedHost may not always be the preferred host. + // Hence, we should safely decrement the counter for the preferredHost + requestsToCountMap.get(request.getPreferredHost()).decrementAndGet(); + } + // To avoid getting back excess resources + manager.cancelResourceRequest(request); + } + } + + /** + * If requestQueue is empty, all extra resources in the buffer should be released and update the entire system's state + * Needs to be synchronized because it is modifying shared state buffers + * @return the number of resources released. + */ + public int releaseExtraResources() { + synchronized (lock) { + int numReleasedResources = 0; + if (requestsQueue.isEmpty()) { + log.debug("Resource Requests Queue is empty."); + if (hostAffinityEnabled) { + List<String> allocatedHosts = getAllocatedHosts(); + for (String host : allocatedHosts) { + numReleasedResources += releaseResourcesForHost(host); + } + } else { + numReleasedResources += releaseResourcesForHost(ANY_HOST); + } + clearState(); + } + return numReleasedResources; + } + } + + /** + * Releases a container that was allocated and assigned but could not be started. + * e.g. because of a ConnectException while trying to communicate with the NM. + * This method assumes the specified container and associated request have already + * been removed from their respective queues. + * + * @param resource the {@link SamzaResource} to release. + */ + public void releaseUnstartableContainer(SamzaResource resource) { + log.info("Releasing unstartable container {}", resource.getResourceID()); + manager.releaseResources(resource); + } + + + /** + * Releases all allocated resources for the specified host. + * @param host the host for which the resources should be released. + * @return the number of resources released. + */ + private int releaseResourcesForHost(String host) { + int numReleasedResources = 0; + List<SamzaResource> samzaResources = allocatedResources.get(host); + if (samzaResources != null) { + for (SamzaResource resource : samzaResources) { + log.info("Releasing extra resource {} allocated on {}", resource.getResourceID(), host); + manager.releaseResources(resource); + numReleasedResources++; + } + } + return numReleasedResources; + } + + + /** + * Clears all the state variables + * Performed when there are no more unfulfilled requests + */ + private void clearState() { + allocatedResources.clear(); + requestsToCountMap.clear(); + requestsQueue.clear(); + } + + /** + * Returns the list of hosts which has at least 1 allocated Resource in the buffer + * @return list of host names + */ + private List<String> getAllocatedHosts() { + List<String> hostKeys = new ArrayList<String>(); + for (Map.Entry<String, List<SamzaResource>> entry: allocatedResources.entrySet()) { + if (entry.getValue().size() > 0) { + hostKeys.add(entry.getKey()); + } + } + return hostKeys; + } + + /** + * Retrieves, but does not remove, the first allocated resource on the specified host. + * + * @param host the host for which a resource is needed. + * @return the first {@link SamzaResource} allocated for the specified host or {@code null} if there isn't one. + */ + + public SamzaResource peekResource(String host) { + synchronized (lock) { + List<SamzaResource> resourcesOnTheHost = this.allocatedResources.get(host); + + if (resourcesOnTheHost == null || resourcesOnTheHost.isEmpty()) { + return null; + } + return resourcesOnTheHost.get(0); + } + } + + /** + * Retrieves, but does not remove, the next pending request in the queue. + * + * @return the pending request or {@code null} if there is no pending request. + */ + public SamzaResourceRequest peekPendingRequest() { + synchronized (lock) { + return this.requestsQueue.peek(); + } + } + + /** + * Returns the number of pending SamzaResource requests in the queue. + * @return the number of pending requests + */ + public int numPendingRequests() { + synchronized (lock) { + return this.requestsQueue.size(); + } + } + + + /** + * Returns the list of resources allocated on a given host. If no resources were ever allocated on + * the given host, it returns null. This method makes a defensive shallow copy. A shallow copy is + * sufficient because the SamzaResource class does not expose setters. + * + * @param host hostname + * @return list of resources allocated on the given host, or null + */ + public List<SamzaResource> getResourcesOnAHost(String host) { + synchronized (lock) { + List<SamzaResource> samzaResourceList = allocatedResources.get(host); + if (samzaResourceList == null) + return null; + + return new ArrayList<SamzaResource>(samzaResourceList); + } + } + + //Package private, used only in tests. + Map<String, AtomicInteger> getRequestsToCountMap() { + return Collections.unmodifiableMap(requestsToCountMap); + } + + +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java new file mode 100644 index 0000000..ca277b3 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +import org.apache.samza.coordinator.JobModelManager; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * SamzaAppState encapsulates state like - completedContainers, runningContainers. This + * class is also used to display information in the Samza UI. Changing any variable name/ + * data structure type in this class WILL break the UI. + * + * TODO: + * 1.Make these variables private, final + * 2.Provide thread-safe accessors. + * //Since the scope of that change is larger, I'm tracking it to work later as a part of SAMZA-902 + * + */ + +public class SamzaApplicationState { + + public enum SamzaAppStatus { UNDEFINED, SUCCEEDED, FAILED } + + public final JobModelManager jobModelManager; + + /** + * JMX Server URL, if enabled + * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml + */ + public String jmxUrl = ""; + /** + * JMX Server Tunneling URL, if enabled + * Used for displaying in the AM UI. See scalate/WEB-INF/views/index.scaml + */ + public String jmxTunnelingUrl = ""; + /** + * The following state variables are required for the correct functioning of the TaskManager + * Some of them are shared between the AMRMCallbackThread and the ContainerAllocator thread, as mentioned below. + */ + + /** + * Number of containers that have completed their execution and exited successfully + */ + public final AtomicInteger completedContainers = new AtomicInteger(0); + + /** + * Number of failed containers + * */ + public final AtomicInteger failedContainers = new AtomicInteger(0); + + /** + * Number of containers released due to extra allocation returned by the RM + */ + public final AtomicInteger releasedContainers = new AtomicInteger(0); + + /** + * ContainerStatus of failed containers. + */ + public final ConcurrentMap<String, SamzaResourceStatus> failedContainersStatus = new ConcurrentHashMap<String, SamzaResourceStatus>(); + + /** + * Number of containers configured for the job + */ + public final AtomicInteger containerCount = new AtomicInteger(0); + + /** + * Set of finished containers - TODO: Can be changed to a counter + */ + public final Set<Integer> finishedContainers = new HashSet<Integer>(); + + /** + * Number of containers needed for the job to be declared healthy + * Modified by both the AMRMCallbackThread and the ContainerAllocator thread + */ + public final AtomicInteger neededResources = new AtomicInteger(0); + + /** + * Map of the samzaContainerId to the {@link SamzaResource} on which it is running + * Modified by both the AMRMCallbackThread and the ContainerAllocator thread + */ + public final ConcurrentMap<Integer, SamzaResource> runningContainers = new ConcurrentHashMap<Integer, SamzaResource>(0); + + /** + * Final status of the application + */ + public SamzaAppStatus status = SamzaAppStatus.UNDEFINED; + + /** + * State indicating whether the job is healthy or not + * Modified by both the callback handler and the ContainerAllocator thread + */ + public final AtomicBoolean jobHealthy = new AtomicBoolean(true); + + public final AtomicInteger containerRequests = new AtomicInteger(0); + + public final AtomicInteger matchedResourceRequests = new AtomicInteger(0); + + public SamzaApplicationState(JobModelManager jobModelManager) { + this.jobModelManager = jobModelManager; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java new file mode 100644 index 0000000..262c60c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.clustermanager; + +/** + * {@code SamzaContainerLaunchException} indicates an {@link Exception} during container launch. + * It can wrap another type of {@link Throwable} or {@link Exception}. Ultimately, any exception thrown + * during container launch should be of this type so it can be handled explicitly. + */ +public class SamzaContainerLaunchException extends Exception { + private static final long serialVersionUID = -3957939806997013992L; + + public SamzaContainerLaunchException() { + super(); + } + + public SamzaContainerLaunchException(String s, Throwable t) { + super(s, t); + } + + public SamzaContainerLaunchException(String s) { + super(s); + } + + public SamzaContainerLaunchException(Throwable t) { + super(t); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/947472a0/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java new file mode 100644 index 0000000..ba6ca2c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.clustermanager; + +/** + * Specification of a Samza Resource. A resource is identified by a unique resource ID. + * A resource is currently comprised of CPUs and Memory resources on a host. + * + */ +public class SamzaResource { + private final int numCores; + private final int memoryMb; + private final String host; + private final String resourceID; + + //TODO: Investigate adding disk space. Mesos supports disk based reservations. + + public SamzaResource(int numCores, int memoryMb, String host, String resourceID) { + this.numCores = numCores; + this.memoryMb = memoryMb; + this.host = host; + this.resourceID = resourceID; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SamzaResource resource = (SamzaResource) o; + + if (numCores != resource.numCores) return false; + if (memoryMb != resource.memoryMb) return false; + return resourceID.equals(resource.resourceID); + + } + + @Override + public int hashCode() { + int result = numCores; + result = 31 * result + memoryMb; + result = 31 * result + resourceID.hashCode(); + return result; + } + + public int getNumCores() { + return numCores; + } + + public int getMemoryMb() { + return memoryMb; + } + + public String getHost() { + return host; + } + + public String getResourceID() { + return resourceID; + } +}
