| ...
Code Block |
| language |
java |
| title |
ContainerPlacementMessage.java |
| linenumbers |
true |
|
/**
* Encapsulates the request or response payload information between the ContainerPlacementHandler service and external
* controllers issuing placement actions
*/
public abstract class ContainerPlacementMessage {
public enum StatusCode {
/**
* Indicates that the container placement action is created
*/
CREATED,
/**
* Indicates that the container placement action was rejected because request was deemed invalid
*/
BAD_REQUEST,
/**
* Indicates that the container placement action is accepted and waiting to be processed
*/
ACCEPTED,
/**
* Indicates that the container placement action is in progress
*/
IN_PROGRESS,
/**
* Indicates that the container placement action is in progress
*/
SUCCEEDED,
/**
* Indicates that the container placement action is in failed
*/
FAILED;
}
/**
* UUID attached to a message which helps in identifying duplicate request messages written to metastore and not
* retake actions even if metastore is eventually consistent
*/
protected final UUID uuid;
/**
* Unique identifier for a deployment so messages can be invalidated across a job restarts
* for ex yarn bases cluster manager should set this to app attempt id
*/
protected final String applicationId;
// Logical container Id 0, 1, 2
protected final String processorId;
// Destination host where container is desired to be moved
protected final String destinationHost;
// Optional request expiry which acts as a timeout for any resource request to cluster resource manager
protected final Duration requestExpiry;
// Status of the current request
protected final StatusCode statusCode;
// Timestamp of the request or response message
protected final long timestamp;
protected ContainerPlacementMessage(UUID uuid, String applicationId, String processorId, String destinationHost,
Duration requestExpiry, StatusCode statusCode, long timestamp) {…}
} |
Code Block |
| language |
java |
| title |
ContainerPlacementRequestMessage |
| linenumbers |
true |
|
/**
* Encapsulates the request sent from the external controller to the JobCoordinator to take a container placement action
*/
public class ContainerPlacementRequestMessage extends ContainerPlacementMessage {
public ContainerPlacementRequestMessage(UUID uuid, String applicationId, String processorId, String destinationHost, Duration requestExpiry, long timestamp) {...}
public ContainerPlacementRequestMessage(UUID uuid, String applicationId, String processorId, String destinationHost, long timestamp) {...}
} |
Code Block |
| language |
java |
| title |
ContainerPlacementResponseMessage |
| linenumbers |
true |
|
/**
* Encapsulates the response sent from the JobCoordinator for a container placement action
*/
public class ContainerPlacementResponseMessage extends ContainerPlacementMessage {
// Returned status of the request
private String responseMessage;
public ContainerPlacementResponseMessage(UUID uuid, String applicationId, String processorId, String destinationHost,
Duration requestExpiry, StatusCode statusCode, String responseMessage, long timestamp) {.. .}
public ContainerPlacementResponseMessage(UUID uuid, String applicationId, String processorId, String destinationHost,
StatusCode statusCode, String responseMessage) {
this(uuid, applicationId, processorId, destinationHost, null, statusCode, responseMessage);
long timestamp) {...}
|
Code Block |
| language |
java |
| title |
ContainerPlacementHandlerContainerPlacementMetadataStore |
| linenumbers |
true |
|
public class ContainerPlacementHandler {
/**
* Entity managing read writes to the metastore for {@link org.apache.samza.container.placement.ContainerPlacementRequestMessage}
* and {@link org.apache.samza.container.placement.ContainerPlacementResponseMessage}
*/
public class ContainerPlacementMetadataStore {
/**
* Writes a {@link ContainerPlacementRequestMessage} to the underlying metastore.
* This method should be used by external controllers to issue a request to JobCoordinator
*
* @param message container placement request
*/
public void writeContainerPlacementRequestMessage(ContainerPlacementRequestMessage message controllers
* to issue a request to JobCoordinator
*
* @param deploymentId identifier of the deployment
* @param processorId logical id of the samza container 0,1,2
* @param destinationHost host where the container is desired to move
* @param requestExpiry optional per request expiry timeout for requests to cluster manager
* @param timestamp timestamp of the request
* @return uuid generated for the request
*/
public UUID writeContainerPlacementRequestMessage(String deploymentId, String processorId, String destinationHost,
Duration requestExpiry, long timestamp) {...}
/**
* Reads a {@link ContainerPlacementRequestMessage} from the underlying metastore
* @param processorIduuid keyuuid of the message, logical processor id of a samza container 0,1,2
request
* @return ContainerPlacementRequestMessage is its present
*/
public Optional<ContainerPlacementRequestMessage> readContainerPlacementRequestMessage(StringUUID processorIduuid) {...}
/**
* Reads a {@link ContainerPlacementResponseMessage} from the underlying metastore
* @param processorIduuid keyuuid of the message, logical processor id of a samza container 0,1,2 request
* @return ContainerPlacementResponseMessage is its present
*/
public Optional<ContainerPlacementResponseMessage> readContainerPlacementResponseMessage(StringUUID processorIduuid) {..}
/**
* Deletes a {@link ContainerPlacementRequestMessage} if present identified by the key {@code processorIduuid}
* @param uuid processorIduuid logicalof processorthe idrequest
0,1,2 */
public void deleteContainerPlacementRequestMessage(StringUUID processorIduuid) {..}
/**
* Deletes a {@link ContainerPlacementResponseMessage} if present identified by the key {@code processorIduuid}
* @param processorIduuid logical processor id 0,1,2
*/
public void deleteContainerPlacementResponseMessage(String processorId) {..}
/**
* Deletes all {@link ContainerPlacementRequestMessage} present in underlying metastoreuuid of the request
*/
public void deleteAllContainerPlacementRequestMessagesdeleteContainerPlacementResponseMessage(UUID uuid) {..}
/**
* Deletes all both {@link ContainerPlacementRequestMessage} and {@link ContainerPlacementResponseMessage} present in underlying metastore identified by uuid
* @param uuid uuid of request and response message
*/
public void deleteAllContainerPlacementResponseMessagesdeleteAllContainerPlacementMessages(UUID uuid) {...}
/**
* Deletes all {@link ContainerPlacementMessage}
* @param uuid uuid of the request or response message
*/
public void deleteAllContainerPlacementMessages(UUID uuid) {..}
/**
* Writes a {@link ContainerPlacementResponseMessage} to the underlying metastore.
* This method should be used by Job Coordinator only to write responses to Container Placement Action
* @param responseMessage response message
*/
void writeContainerPlacementResponseMessage(ContainerPlacementResponseMessage responseMessage) {..}
} |
Code Block |
| language |
java |
| title |
ContainerManager |
| linenumbers |
true |
|
public class ContainerManager {
/**
* Registers a container placement action to move the running container to destination host
*
* @param requestMessage request containing details of placement request
* @param containerAllocator to request physical resources
*/
public void registerContainerPlacementAction(ContainerPlacementRequestMessage requestMessage, ContainerAllocator containerAllocator) {...}
/**
* Handles the container start action for both active & standby containers. This method is invoked by the allocator thread
*
* @param request pending request for the preferred host
* @param preferredHost preferred host to start the container
* @param allocatedResource resource allocated from {@link ClusterResourceManager}
* @param resourceRequestState state of request in {@link ContainerAllocator}
* @param allocator to request resources from @{@link ClusterResourceManager}
*
* @return true if the container launch is complete, false if the container launch is in progress.
*/
boolean handleContainerLaunch(SamzaResourceRequest request, String preferredHost, SamzaResource allocatedResource,
ResourceRequestState resourceRequestState, ContainerAllocator allocator) {..}
/**
* Handle the container launch failure for active containers and standby (if enabled).
*
* @param processorId logical id of the container eg 1,2,3
* @param containerId last known id of the container deployed
* @param preferredHost host on which container is requested to be deployed
* @param containerAllocator allocator for requesting resources
*/
void handleContainerLaunchFail(String processorId, String containerId, String preferredHost,
ContainerAllocator containerAllocator) {...}
/**
* Handles the state update on successful launch of a container
*
* @param processorId logical processor id of container 0,1,2
*/
void handleContainerLaunchSuccess(String processorId) {...}
/**
* Handles the action to be taken after the container has been stopped.
*
* @param processorId logical id of the container eg 1,2,3
* @param containerId last known id of the container deployed
* @param preferredHost host on which container was last deployed
* @param exitStatus exit code returned by the container
* @param preferredHostRetryDelay delay to be incurred before requesting resources
* @param containerAllocator allocator for requesting resources
*/
void handleContainerStop(String processorId, String containerId, String preferredHost, int exitStatus,
Duration preferredHostRetryDelay, ContainerAllocator containerAllocator) {..}
/**
* Handles an expired resource request for both active and standby containers.
*
* @param processorId logical id of the container
* @param preferredHost host on which container is requested to be deployed
* @param request pending request for the preferred host
* @param allocator allocator for requesting resources
* @param resourceRequestState state of request in {@link ContainerAllocator}
*/
void handleExpiredRequest(String processorId, String preferredHost,
SamzaResourceRequest request, ContainerAllocator allocator, ResourceRequestState resourceRequestState) {..}
}
|
... |
|
|
|