http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/proto/mesos/v1/scheduler.proto ---------------------------------------------------------------------- diff --git a/myriad-commons/proto/mesos/v1/scheduler.proto b/myriad-commons/proto/mesos/v1/scheduler.proto new file mode 100644 index 0000000..1fb0254 --- /dev/null +++ b/myriad-commons/proto/mesos/v1/scheduler.proto @@ -0,0 +1,420 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto2"; + +import "mesos/v1/mesos.proto"; + +package mesos.v1.scheduler; + +option java_package = "org.apache.mesos.v1.scheduler"; +option java_outer_classname = "Protos"; + + +/** + * Scheduler event API. + * + * An event is described using the standard protocol buffer "union" + * trick, see: + * https://developers.google.com/protocol-buffers/docs/techniques#union. + */ +message Event { + // Possible event types, followed by message definitions if + // applicable. + enum Type { + // This must be the first enum value in this list, to + // ensure that if 'type' is not set, the default value + // is UNKNOWN. This enables enum values to be added + // in a backwards-compatible way. See: MESOS-4997. + UNKNOWN = 0; + + SUBSCRIBED = 1; // See 'Subscribed' below. + OFFERS = 2; // See 'Offers' below. + INVERSE_OFFERS = 9; // See 'InverseOffers' below. + RESCIND = 3; // See 'Rescind' below. + RESCIND_INVERSE_OFFER = 10; // See 'RescindInverseOffer' below. + UPDATE = 4; // See 'Update' below. + MESSAGE = 5; // See 'Message' below. + FAILURE = 6; // See 'Failure' below. + ERROR = 7; // See 'Error' below. + + // Periodic message sent by the Mesos master according to + // 'Subscribed.heartbeat_interval_seconds'. If the scheduler does + // not receive any events (including heartbeats) for an extended + // period of time (e.g., 5 x heartbeat_interval_seconds), there is + // likely a network partition. In such a case the scheduler should + // close the existing subscription connection and resubscribe + // using a backoff strategy. + HEARTBEAT = 8; + } + + // First event received when the scheduler subscribes. + message Subscribed { + required FrameworkID framework_id = 1; + + // This value will be set if the master is sending heartbeats. See + // the comment above on 'HEARTBEAT' for more details. + optional double heartbeat_interval_seconds = 2; + + // Since Mesos 1.1. + optional MasterInfo master_info = 3; + } + + // Received whenever there are new resources that are offered to the + // scheduler. Each offer corresponds to a set of resources on an + // agent. Until the scheduler accepts or declines an offer the + // resources are considered allocated to the scheduler. + message Offers { + repeated Offer offers = 1; + } + + // Received whenever there are resources requested back from the + // scheduler. Each inverse offer specifies the agent, and + // optionally specific resources. Accepting or Declining an inverse + // offer informs the allocator of the scheduler's ability to release + // the specified resources without violating an SLA. If no resources + // are specified then all resources on the agent are requested to be + // released. + message InverseOffers { + repeated InverseOffer inverse_offers = 1; + } + + // Received when a particular offer is no longer valid (e.g., the + // agent corresponding to the offer has been removed) and hence + // needs to be rescinded. Any future calls ('Accept' / 'Decline') made + // by the scheduler regarding this offer will be invalid. + message Rescind { + required OfferID offer_id = 1; + } + + // Received when a particular inverse offer is no longer valid + // (e.g., the agent corresponding to the offer has been removed) + // and hence needs to be rescinded. Any future calls ('Accept' / + // 'Decline') made by the scheduler regarding this inverse offer + // will be invalid. + message RescindInverseOffer { + required OfferID inverse_offer_id = 1; + } + + // Received whenever there is a status update that is generated by + // the executor or agent or master. Status updates should be used by + // executors to reliably communicate the status of the tasks that + // they manage. It is crucial that a terminal update (see TaskState + // in v1/mesos.proto) is sent by the executor as soon as the task + // terminates, in order for Mesos to release the resources allocated + // to the task. It is also the responsibility of the scheduler to + // explicitly acknowledge the receipt of a status update. See + // 'Acknowledge' in the 'Call' section below for the semantics. + // + // A task status update may be used for guaranteed delivery of some + // task-related information, e.g., task's health update. Such + // information may be shadowed by subsequent task status updates, that + // do not preserve fields of the previously sent message. + message Update { + required TaskStatus status = 1; + } + + // Received when a custom message generated by the executor is + // forwarded by the master. Note that this message is not + // interpreted by Mesos and is only forwarded (without reliability + // guarantees) to the scheduler. It is up to the executor to retry + // if the message is dropped for any reason. + message Message { + required AgentID agent_id = 1; + required ExecutorID executor_id = 2; + required bytes data = 3; + } + + // Received when an agent is removed from the cluster (e.g., failed + // health checks) or when an executor is terminated. Note that, this + // event coincides with receipt of terminal UPDATE events for any + // active tasks belonging to the agent or executor and receipt of + // 'Rescind' events for any outstanding offers belonging to the + // agent. Note that there is no guaranteed order between the + // 'Failure', 'Update' and 'Rescind' events when an agent or executor + // is removed. + // TODO(vinod): Consider splitting the lost agent and terminated + // executor into separate events and ensure it's reliably generated. + message Failure { + optional AgentID agent_id = 1; + + // If this was just a failure of an executor on an agent then + // 'executor_id' will be set and possibly 'status' (if we were + // able to determine the exit status). + optional ExecutorID executor_id = 2; + + // On Posix, `status` corresponds to termination information in the + // `stat_loc` area returned from a `waitpid` call. On Windows, `status` + // is obtained via calling the `GetExitCodeProcess()` function. For + // messages coming from Posix agents, schedulers need to apply + // `WEXITSTATUS` family macros or equivalent transformations to obtain + // exit codes. + // + // TODO(alexr): Consider unifying Windows and Posix behavior by returning + // exit code here, see MESOS-7241. + optional int32 status = 3; + } + + // Received when there is an unrecoverable error in the scheduler (e.g., + // scheduler failed over, rate limiting, authorization errors etc.). The + // scheduler should abort on receiving this event. + message Error { + required string message = 1; + } + + // Type of the event, indicates which optional field below should be + // present if that type has a nested message definition. + // Enum fields should be optional, see: MESOS-4997. + optional Type type = 1; + + optional Subscribed subscribed = 2; + optional Offers offers = 3; + optional InverseOffers inverse_offers = 9; + optional Rescind rescind = 4; + optional RescindInverseOffer rescind_inverse_offer = 10; + optional Update update = 5; + optional Message message = 6; + optional Failure failure = 7; + optional Error error = 8; +} + + +/** + * Scheduler call API. + * + * Like Event, a Call is described using the standard protocol buffer + * "union" trick (see above). + */ +message Call { + // Possible call types, followed by message definitions if + // applicable. + enum Type { + // See comments above on `Event::Type` for more details on this enum value. + UNKNOWN = 0; + + SUBSCRIBE = 1; // See 'Subscribe' below. + TEARDOWN = 2; // Shuts down all tasks/executors and removes framework. + ACCEPT = 3; // See 'Accept' below. + DECLINE = 4; // See 'Decline' below. + ACCEPT_INVERSE_OFFERS = 13; // See 'AcceptInverseOffers' below. + DECLINE_INVERSE_OFFERS = 14; // See 'DeclineInverseOffers' below. + REVIVE = 5; // Removes any previous filters set via ACCEPT or DECLINE. + KILL = 6; // See 'Kill' below. + SHUTDOWN = 7; // See 'Shutdown' below. + ACKNOWLEDGE = 8; // See 'Acknowledge' below. + RECONCILE = 9; // See 'Reconcile' below. + MESSAGE = 10; // See 'Message' below. + REQUEST = 11; // See 'Request' below. + SUPPRESS = 12; // Inform master to stop sending offers to the framework. + + // TODO(benh): Consider adding an 'ACTIVATE' and 'DEACTIVATE' for + // already subscribed frameworks as a way of stopping offers from + // being generated and other events from being sent by the master. + // Note that this functionality existed originally to support + // SchedulerDriver::abort which was only necessary to handle + // exceptions getting thrown from within Scheduler callbacks, + // something that is not an issue with the Event/Call API. + } + + // Subscribes the scheduler with the master to receive events. A + // scheduler must send other calls only after it has received the + // SUBCRIBED event. + message Subscribe { + // See the comments below on 'framework_id' on the semantics for + // 'framework_info.id'. + required FrameworkInfo framework_info = 1; + + // List of suppressed roles for which the framework does not wish to be + // offered resources. The framework can decide to suppress all or a subset + // of roles the framework (re)registers as. + repeated string suppressed_roles = 2; + } + + // Accepts an offer, performing the specified operations + // in a sequential manner. + // + // E.g. Launch a task with a newly reserved persistent volume: + // + // Accept { + // offer_ids: [ ... ] + // operations: [ + // { type: RESERVE, + // reserve: { resources: [ disk(role):2 ] } } + // { type: CREATE, + // create: { volumes: [ disk(role):1+persistence ] } } + // { type: LAUNCH, + // launch: { task_infos ... disk(role):1;disk(role):1+persistence } } + // ] + // } + // + // Note that any of the offerâs resources not used in the 'Accept' + // call (e.g., to launch a task) are considered unused and might be + // reoffered to other frameworks. In other words, the same OfferID + // cannot be used in more than one 'Accept' call. + message Accept { + repeated OfferID offer_ids = 1; + repeated Offer.Operation operations = 2; + optional Filters filters = 3; + } + + // Declines an offer, signaling the master to potentially reoffer + // the resources to a different framework. Note that this is same + // as sending an Accept call with no operations. See comments on + // top of 'Accept' for semantics. + message Decline { + repeated OfferID offer_ids = 1; + optional Filters filters = 2; + } + + // Accepts an inverse offer. Inverse offers should only be accepted + // if the resources in the offer can be safely evacuated before the + // provided unavailability. + message AcceptInverseOffers { + repeated OfferID inverse_offer_ids = 1; + optional Filters filters = 2; + } + + // Declines an inverse offer. Inverse offers should be declined if + // the resources in the offer might not be safely evacuated before + // the provided unavailability. + message DeclineInverseOffers { + repeated OfferID inverse_offer_ids = 1; + optional Filters filters = 2; + } + + // Revive offers for the specified roles. If `roles` is empty, + // the `REVIVE` call will revive offers for all of the roles + // the framework is currently subscribed to. + message Revive { + repeated string roles = 1; + } + + // Kills a specific task. If the scheduler has a custom executor, + // the kill is forwarded to the executor and it is up to the + // executor to kill the task and send a TASK_KILLED (or TASK_FAILED) + // update. Note that Mesos releases the resources for a task once it + // receives a terminal update (See TaskState in v1/mesos.proto) for + // it. If the task is unknown to the master, a TASK_LOST update is + // generated. + // + // If a task within a task group is killed before the group is + // delivered to the executor, all tasks in the task group are + // killed. When a task group has been delivered to the executor, + // it is up to the executor to decide how to deal with the kill. + // Note The default Mesos executor will currently kill all the + // tasks in the task group if it gets a kill for any task. + message Kill { + required TaskID task_id = 1; + optional AgentID agent_id = 2; + + // If set, overrides any previously specified kill policy for this task. + // This includes 'TaskInfo.kill_policy' and 'Executor.kill.kill_policy'. + // Can be used to forcefully kill a task which is already being killed. + optional KillPolicy kill_policy = 3; + } + + // Shuts down a custom executor. When the executor gets a shutdown + // event, it is expected to kill all its tasks (and send TASK_KILLED + // updates) and terminate. If the executor doesnât terminate within + // a certain timeout (configurable via + // '--executor_shutdown_grace_period' agent flag), the agent will + // forcefully destroy the container (executor and its tasks) and + // transition its active tasks to TASK_LOST. + message Shutdown { + required ExecutorID executor_id = 1; + required AgentID agent_id = 2; + } + + // Acknowledges the receipt of status update. Schedulers are + // responsible for explicitly acknowledging the receipt of status + // updates that have 'Update.status().uuid()' field set. Such status + // updates are retried by the agent until they are acknowledged by + // the scheduler. + message Acknowledge { + required AgentID agent_id = 1; + required TaskID task_id = 2; + required bytes uuid = 3; + } + + // Allows the scheduler to query the status for non-terminal tasks. + // This causes the master to send back the latest task status for + // each task in 'tasks', if possible. Tasks that are no longer known + // will result in a TASK_LOST, TASK_UNKNOWN, or TASK_UNREACHABLE update. + // If 'tasks' is empty, then the master will send the latest status + // for each task currently known. + message Reconcile { + // TODO(vinod): Support arbitrary queries than just state of tasks. + message Task { + required TaskID task_id = 1; + optional AgentID agent_id = 2; + } + + repeated Task tasks = 1; + } + + // Sends arbitrary binary data to the executor. Note that Mesos + // neither interprets this data nor makes any guarantees about the + // delivery of this message to the executor. + message Message { + required AgentID agent_id = 1; + required ExecutorID executor_id = 2; + required bytes data = 3; + } + + // Requests a specific set of resources from Mesos's allocator. If + // the allocator has support for this, corresponding offers will be + // sent asynchronously via the OFFERS event(s). + // + // NOTE: The built-in hierarchical allocator doesn't have support + // for this call and hence simply ignores it. + message Request { + repeated mesos.v1.Request requests = 1; + } + + // Suppress offers for the specified roles. If `roles` is empty, + // the `SUPPRESS` call will suppress offers for all of the roles + // the framework is currently subscribed to. + message Suppress { + repeated string roles = 1; + } + + // Identifies who generated this call. Master assigns a framework id + // when a new scheduler subscribes for the first time. Once assigned, + // the scheduler must set the 'framework_id' here and within its + // FrameworkInfo (in any further 'Subscribe' calls). This allows the + // master to identify a scheduler correctly across disconnections, + // failovers, etc. + optional FrameworkID framework_id = 1; + + // Type of the call, indicates which optional field below should be + // present if that type has a nested message definition. + // See comments on `Event::Type` above on the reasoning behind this field being optional. + optional Type type = 2; + + optional Subscribe subscribe = 3; + optional Accept accept = 4; + optional Decline decline = 5; + optional AcceptInverseOffers accept_inverse_offers = 13; + optional DeclineInverseOffers decline_inverse_offers = 14; + optional Revive revive = 15; + optional Kill kill = 6; + optional Shutdown shutdown = 7; + optional Acknowledge acknowledge = 8; + optional Reconcile reconcile = 9; + optional Message message = 10; + optional Request request = 11; + optional Suppress suppress = 16; +}
http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/Executor.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/Executor.java b/myriad-commons/src/main/java/org/apache/mesos/Executor.java new file mode 100644 index 0000000..095ca65 --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/Executor.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos; + +import org.apache.mesos.Protos.*; + +/** + * Callback interface to be implemented by frameworks' executors. + * Note that only one callback will be invoked at a time, so it is not + * recommended that you block within a callback because it may cause a + * deadlock. + * <p> + * Each callback includes a reference to the executor driver that was + * used to run this executor. The reference will not change for the + * duration of an executor (i.e., from the point you do + * {@link ExecutorDriver#start} to the point that + * {@link ExecutorDriver#join} returns). + * This is intended for convenience so that an executor + * doesn't need to store a reference to the driver itself. + */ +public interface Executor { + + /** + * Invoked once the executor driver has been able to successfully + * connect with Mesos. In particular, a scheduler can pass some + * data to its executors through the {@link ExecutorInfo#getData()} + * field. + * + * @param driver The executor driver that was registered and connected + * to the Mesos cluster. + * @param executorInfo Describes information about the executor that was + * registered. + * @param frameworkInfo Describes the framework that was registered. + * @param slaveInfo Describes the slave that will be used to launch + * the tasks for this executor. + * + * @see ExecutorDriver + * @see MesosSchedulerDriver + */ + // TODO(vinod): Add a new reregistered callback for when the executor + // re-connects with a restarted slave. + void registered(ExecutorDriver driver, + ExecutorInfo executorInfo, + FrameworkInfo frameworkInfo, + SlaveInfo slaveInfo); + + /** + * Invoked when the executor re-registers with a restarted slave. + * + * @param driver The executor driver that was re-registered with the + * Mesos master. + * @param slaveInfo Describes the slave that will be used to launch + * the tasks for this executor. + * + * @see ExecutorDriver + */ + void reregistered(ExecutorDriver driver, SlaveInfo slaveInfo); + + /** + * Invoked when the executor becomes "disconnected" from the slave + * (e.g., the slave is being restarted due to an upgrade). + * + * @param driver The executor driver that was disconnected. + */ + void disconnected(ExecutorDriver driver); + + /** + * Invoked when a task has been launched on this executor (initiated + * via {@link SchedulerDriver#launchTasks}. Note that this task can be + * realized with a thread, a process, or some simple computation, + * however, no other callbacks will be invoked on this executor + * until this callback has returned. + * + * @param driver The executor driver that launched the task. + * @param task Describes the task that was launched. + * + * @see ExecutorDriver + * @see TaskInfo + */ + void launchTask(ExecutorDriver driver, TaskInfo task); + + /** + * Invoked when a task running within this executor has been killed + * (via {@link org.apache.mesos.SchedulerDriver#killTask}). Note that no + * status update will be sent on behalf of the executor, the executor is + * responsible for creating a new TaskStatus (i.e., with TASK_KILLED) + * and invoking {@link ExecutorDriver#sendStatusUpdate}. + * + * @param driver The executor driver that owned the task that was killed. + * @param taskId The ID of the task that was killed. + * + * @see ExecutorDriver + * @see TaskID + */ + void killTask(ExecutorDriver driver, TaskID taskId); + + /** + * Invoked when a framework message has arrived for this + * executor. These messages are best effort; do not expect a + * framework message to be retransmitted in any reliable fashion. + * + * @param driver The executor driver that received the message. + * @param data The message payload. + * + * @see ExecutorDriver + */ + void frameworkMessage(ExecutorDriver driver, byte[] data); + + /** + * Invoked when the executor should terminate all of its currently + * running tasks. Note that after Mesos has determined that an + * executor has terminated any tasks that the executor did not send + * terminal status updates for (e.g. TASK_KILLED, TASK_FINISHED, + * TASK_FAILED, etc) a TASK_LOST status update will be created. + * + * @param driver The executor driver that should terminate. + * + * @see ExecutorDriver + */ + void shutdown(ExecutorDriver driver); + + /** + * Invoked when a fatal error has occurred with the executor and/or + * executor driver. The driver will be aborted BEFORE invoking this + * callback. + * + * @param driver The executor driver that was aborted due this error. + * @param message The error message. + * + * @see ExecutorDriver + */ + void error(ExecutorDriver driver, String message); +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/ExecutorDriver.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/ExecutorDriver.java b/myriad-commons/src/main/java/org/apache/mesos/ExecutorDriver.java new file mode 100644 index 0000000..68d3ea9 --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/ExecutorDriver.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos; + +import org.apache.mesos.Protos.*; + +/** + * Abstract interface for connecting an executor to Mesos. This + * interface is used both to manage the executor's lifecycle (start + * it, stop it, or wait for it to finish) and to interact with Mesos + * (e.g., send status updates, send framework messages, etc.). + */ +public interface ExecutorDriver { + /** + * Starts the executor driver. This needs to be called before any + * other driver calls are made. + * + * @return The state of the driver after the call. + * + * @see Status + */ + public Status start(); + + /** + * Stops the executor driver. + * + * @return The state of the driver after the call. + * + * @see Status + */ + public Status stop(); + + /** + * Aborts the driver so that no more callbacks can be made to the + * executor. The semantics of abort and stop have deliberately been + * separated so that code can detect an aborted driver (i.e., via + * the return status of {@link ExecutorDriver#join}, see below), + * and instantiate and start another driver if desired (from within + * the same process ... although this functionality is currently not + * supported for executors). + * + * @return The state of the driver after the call. + * + * @see Status + */ + public Status abort(); + + /** + * Waits for the driver to be stopped or aborted, possibly + * _blocking_ the current thread indefinitely. The return status of + * this function can be used to determine if the driver was aborted + * (see mesos.proto for a description of Status). + * + * @return The state of the driver after the call. + * + * @see Status + */ + public Status join(); + + /** + * Starts and immediately joins (i.e., blocks on) the driver. + * + * @return The state of the driver after the call. + * + * @see Status + */ + public Status run(); + + /** + * Sends a status update to the framework scheduler, retrying as + * necessary until an acknowledgement has been received or the + * executor is terminated (in which case, a TASK_LOST status update + * will be sent). See {@link Scheduler#statusUpdate} for more + * information about status update acknowledgements. + * + * @param status The status update to send. + * + * @return The state of the driver after the call. + * + * @see Status + */ + public Status sendStatusUpdate(TaskStatus status); + + /** + * Sends a message to the framework scheduler. These messages are + * best effort; do not expect a framework message to be + * retransmitted in any reliable fashion. + * + * @param data The message payload. + * + * @return The state of the driver after the call. + * + * @see Status + */ + public Status sendFrameworkMessage(byte[] data); +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/Log.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/Log.java b/myriad-commons/src/main/java/org/apache/mesos/Log.java new file mode 100644 index 0000000..6603263 --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/Log.java @@ -0,0 +1,468 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos; + +import java.io.Closeable; +import java.io.IOException; + +import java.util.List; +import java.util.Set; + +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; + +/** + * Provides access to a distributed append only log. The log can be + * read from using a {@link Log.Reader} and written to using a + * {@link Log.Writer}. + * + * <p>Both the <i>Reader</i> and <i>Writer</i> will require a <i>quorum</i> + * which defines the <i>ratio of active Mesos Masters</i> that need to be + * available for a successful read or write. The <i>quorum</i> will be satisfied + * when the number of <i>active Masters</i> is greater than the given + * <i>number</i>: + * <pre>{@code + * Quorum > (Number of Masters)/2 + * }</pre> + * + * <p>If a <i>read</i> or <i>write</i> is executed the operation will wait + * until their is <i>quorum</i> to succeed. + */ +public class Log { + static { + MesosNativeLibrary.load(); + } + + /** + * An opaque identifier of a log entry's position within the + * log. Can be used to inidicate {@link Log.Reader#read read} ranges and + * {@link Log.Writer#truncate truncation} locations. + */ + public static class Position implements Comparable<Position> { + @Override + public int compareTo(Position that) { + return Long.signum(value - that.value); + } + + @Override + public boolean equals(Object that) { + return that instanceof Position && value == ((Position) that).value; + } + + @Override + public String toString() { + return "Position " + value; + } + + /** + * Returns an "identity" of this position, useful for serializing + * to logs or across communication mediums. + * + * @return The identity in bytes. + */ + public byte[] identity() { + byte[] bytes = new byte[8]; + bytes[0] = (byte) (0xff & (value >> 56)); + bytes[1] = (byte) (0xff & (value >> 48)); + bytes[2] = (byte) (0xff & (value >> 40)); + bytes[3] = (byte) (0xff & (value >> 32)); + bytes[4] = (byte) (0xff & (value >> 24)); + bytes[5] = (byte) (0xff & (value >> 16)); + bytes[6] = (byte) (0xff & (value >> 8)); + bytes[7] = (byte) (0xff & value); + return bytes; + } + + /** + * Creates a position identified by an integral {@code value}. + * <p> + * Positions are typically only created by the log implementation. Log + * users should only ever need to call this constructor in unit tests. + * + * @param value The marker for this position in the log. + */ + public Position(long value) { + this.value = value; + } + + private final long value; + } + + /** + * Represents an opaque data entry in the {@link Log} with a + * {@link Log.Position}. + */ + public static class Entry { + /** + * The position of this entry. + * @see Position + */ + public final Position position; + /** The data at the given position.*/ + public final byte[] data; + + /** + * Creates a log entry. + * <p> + * Entries are typically only created by the log implementation. Log + * users should only ever need to call this constructor in unit tests. + * + * @param position The unique position of this entry within the log. + * @param data The content stored in this entry. + */ + public Entry(Position position, byte[] data) { + this.position = position; + this.data = data; + } + } + + /** + * An exception that gets thrown when an error occurs while + * performing a read or write operation. + */ + public static class OperationFailedException extends Exception { + /** + * @param message The message for this exception. + */ + public OperationFailedException(String message) { + super(message); + } + + /** + * @param message The message for this exception. + * @param cause The underlying reason this exception was generated. + */ + public OperationFailedException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * An exception that gets thrown when a writer no longer has the + * ability to perform operations (e.g., because it was superseded by + * another writer). + */ + public static class WriterFailedException extends Exception { + /** + * @param message The message for this exception. + */ + public WriterFailedException(String message) { + super(message); + } + + /** + * @param message The message for this exception. + * @param cause The underlying reason this exception was generated. + */ + public WriterFailedException(String message, Throwable cause) { + super(message, cause); + } + } + + /** + * Provides read access to the {@link Log}. This class is safe for + * use from multiple threads and for the life of the log regardless + * of any exceptions thrown from its methods. + */ + public static class Reader { + /** + * Returns an instance of a reader that will access the given instance of + * the Log. + * @param log The log that this reader will access. + */ + public Reader(Log log) { + this.log = log; + initialize(log); + } + + /** + * Attempts to read from the log between the specified positions + * (inclusive). If either of the positions are invalid, an + * {@link OperationFailedException} will get thrown. Unfortunately, this + * will also get thrown in other circumstances (e.g., disk + * failure) and therefore it is currently impossible to tell these + * two cases apart. + * + * @param from Where to start reading. + * @param to Where to finish reading. + * @param timeout Max number of time units to wait before a + * {@link TimeoutException}. + * @param unit Type of units used for the timeout, e.g. seconds, + * minutes, etc. + * + * @return The list of entries fetched from the Log. + * + * @throws TimeoutException If the read doesn't happen before the + * timeout. + * @throws OperationFailedException If the read fails due that the read no + * longer has the ability to perform its + * operations. + * @see Position + * @see TimeUnit + */ + public native List<Entry> read(Position from, + Position to, + long timeout, + TimeUnit unit) + throws TimeoutException, OperationFailedException; + + /** + * Returns the beginning position of the log (might be out of date + * with respect to another replica). + * + * @return The beginning position of the log. + */ + public native Position beginning(); + + /** + * Returns the ending position of the log (might be out of date + * with respect to another replica). + * + * @return The ending position of the log + */ + public native Position ending(); + + /** + * Attempts to catch-up positions from the log for reading. + * + * @param timeout Max number of time units to wait before a + * {@link TimeoutException}. + * @param unit Type of time units used for the timeout, e.g. seconds, + * minutes, etc. + * + * @return The ending position of the caught-up range. + * + * @throws TimeoutException If the catch-up doesn't happen before + * the timeout. + * @throws OperationFailedException If the catch-up fails. + */ + public native Position catchup(long timeout, TimeUnit unit) + throws TimeoutException, OperationFailedException; + + protected native void initialize(Log log); + + protected native void finalize(); + + private Log log; // Keeps the log from getting garbage collected. + private long __log; + private long __reader; + } + + /** + * Provides write access to the {@link Log}. This class is not safe + * for use from multiple threads and instances should be thrown out + * after any {@link WriterFailedException} is thrown. + */ + public static class Writer { + /** + * Constructs a writer linked the given {@link Log}. + * + * @param log The log that this writer will access. + * @param timeout Max number of time units to wait before a + * {@link TimeoutException}. + * @param unit Type of time units used for the timeout, e.g. seconds, + * minutes, etc. + * @param retries Number of retries + * + * @see TimeUnit + */ + public Writer(Log log, long timeout, TimeUnit unit, int retries) { + this.log = log; + initialize(log, timeout, unit, retries); + } + + /** + * Attempts to append to the log with the specified data returning + * the new end position of the log if successful. + * + * @param data Data to append to the log. + * @param timeout Max number of time units to wait before a + * {@link TimeoutException}. + * @param unit Type of time units used for the timeout, e.g. seconds, + * minutes, etc. + * + * @return The new end-position. + * + * @throws TimeoutException If the append doesn't happen before the + * timeout. + * @throws WriterFailedException If the append fails due that the writer + * no longer has the ability to perform its + * operations (e.g., because it was + * superseded by another writer). + * @see TimeUnit + * @see WriterFailedException + */ + public native Position append(byte[] data, long timeout, TimeUnit unit) + throws TimeoutException, WriterFailedException; + + /** + * Attempts to truncate the log (from the beginning to the + * specified position exclusive) If the position is invalid, an + * {@link WriterFailedException} will get thrown. Unfortunately, this will + * also get thrown in other circumstances (e.g., disk failure) and + * therefore it is currently impossible to tell these two cases + * apart. + * + + * @param to The log will be truncated up to this point. + * @param timeout Max number of time units to wait before a + * {@link TimeoutException}. + * @param unit Type of time units used for the timeout, e.g. seconds, + * minutes, etc. + * + * @return The position after the truncation. + * + * @throws TimeoutException If the truncation doesn't happen before + * the timeout. + * @throws WriterFailedException If the truncation fails due an invalid + * position or if the writer no longer has + * the ability to perform its operations + * (e.g., because it was superseded by + * another writer). + */ + // TODO(benh): Throw both OperationFailedException and WriterFailedException + // to differentiate the need for a new writer from a bad + // position, or a bad disk, etc. + public native Position truncate(Position to, long timeout, TimeUnit unit) + throws TimeoutException, WriterFailedException; + + protected native void initialize(Log log, + long timeout, + TimeUnit unit, + int retries); + + protected native void finalize(); + + private Log log; // Keeps the log from getting garbage collected. + private long __log; + private long __writer; + } + + /** + * Creates a new replicated log that assumes the specified quorum + * size, is backed by a file at the specified path, and coordiantes + * with other replicas via the set of process PIDs. + * + * @param quorum The quorum size. + * @param path Path to the file backing this log. + * @param pids PIDs of the replicas to coordinate with. + */ + public Log(int quorum, + String path, + Set<String> pids) { + initialize(quorum, path, pids); + } + + /** + * Creates a new replicated log that assumes the specified quorum + * size, is backed by a file at the specified path, and coordiantes + * with other replicas associated with the specified ZooKeeper + * servers, timeout, and znode (or Zookeeper name space). + * + * @param quorum The quorum size. + * @param path Path to the file backing this log. + * @param servers List of ZooKeeper servers (e.g., 'ip1:port1,ip2:port2'). + * @param timeout Max number of time units to wait before a + * {@link TimeoutException}. + * @param unit Type of time units used for the timeout, e.g. seconds, + * minutes, etc. + * @param znode Path to znode where "state" should be rooted. + */ + public Log(int quorum, + String path, + String servers, + long timeout, + TimeUnit unit, + String znode) { + initialize(quorum, path, servers, timeout, unit, znode); + } + + /** + * Creates a new replicated log that assumes the specified quorum + * size, is backed by a file at the specified path, and coordiantes + * with other replicas associated with the specified ZooKeeper + * servers, timeout, and znode (or Zookeeper name space). + * + * @param quorum The quorum size. + * @param path Path to the file backing this log. + * @param servers Zookeper servers/connection string. + * @param timeout Max number of time units to wait before a + * {@link TimeoutException}. + * @param unit Type of time units used for the timeout, e.g. seconds, + * minutes, etc. + * @param znode The Zookeeper name space. + * @param scheme Authentication scheme (e.g., "digest"). + * @param credentials Authentication credentials (e.g., "user:pass"). + */ + public Log(int quorum, + String path, + String servers, + long timeout, + TimeUnit unit, + String znode, + String scheme, + byte[] credentials) { + initialize(quorum, path, servers, timeout, unit, znode, scheme, credentials); + } + + /** + * Returns a position based off of the bytes recovered from + * Position.identity(). + * + * @param identity Identity, in bytes, of the position. + * + * @return The position. + */ + public Position position(byte[] identity) { + long value = + ((long) (identity[0] & 0xff) << 56) | + ((long) (identity[1] & 0xff) << 48) | + ((long) (identity[2] & 0xff) << 40) | + ((long) (identity[3] & 0xff) << 32) | + ((long) (identity[4] & 0xff) << 24) | + ((long) (identity[5] & 0xff) << 16) | + ((long) (identity[6] & 0xff) << 8) | + ((long) (identity[7] & 0xff)); + return new Position(value); + } + + protected native void initialize(int quorum, + String path, + Set<String> pids); + + protected native void initialize(int quorum, + String path, + String servers, + long timeout, + TimeUnit unit, + String znode); + + protected native void initialize(int quorum, + String path, + String servers, + long timeout, + TimeUnit unit, + String znode, + String scheme, + byte[] credentials); + + protected native void finalize(); + + private long __log; +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/MesosExecutorDriver.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/MesosExecutorDriver.java b/myriad-commons/src/main/java/org/apache/mesos/MesosExecutorDriver.java new file mode 100644 index 0000000..b52a81b --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/MesosExecutorDriver.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos; + +import org.apache.mesos.Protos.*; + +/** + * Concrete implementation of an ExecutorDriver that connects an + * Executor with a Mesos slave. The MesosExecutorDriver is + * thread-safe. + * <p> + * The driver is responsible for invoking the Executor callbacks as it + * communicates with the Mesos slave. + * </p> + * <p> + * Note that blocking on the MesosExecutorDriver (e.g., via {@link + * #join}) doesn't affect the executor callbacks in anyway because + * they are handled by a different thread. + * </p> + * <p> + * Note that the driver uses GLOG to do its own logging. GLOG flags can + * be set via environment variables, prefixing the flag name with + * "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see + * src/logging/flags.hpp. Mesos flags can also be set via environment + * variables, prefixing the flag name with "MESOS_", e.g., + * "MESOS_QUIET=1". + * </p> + * <p> + * See src/examples/java/TestExecutor.java for an example of using the + * MesosExecutorDriver. + * </p> + */ +public class MesosExecutorDriver implements ExecutorDriver { + static { + MesosNativeLibrary.load(); + } + + /** + * Creates a new driver that uses the specified Executor. + * + * @param executor The instance of the executor that will be used + * to connect to the slave. + * + * @see Executor + */ + public MesosExecutorDriver(Executor executor) { + if (executor == null) { + throw new NullPointerException("Not expecting a null Executor"); + } + + this.executor = executor; + + initialize(); + } + + /** + * See ExecutorDriver for descriptions of these. + * + * @see ExecutorDriver + */ + public native Status start(); + public native Status stop(); + public native Status abort(); + public native Status join(); + + public Status run() { + Status status = start(); + return status != Status.DRIVER_RUNNING ? status : join(); + } + + public native Status sendStatusUpdate(TaskStatus status); + public native Status sendFrameworkMessage(byte[] data); + + protected native void initialize(); + protected native void finalize(); + + private final Executor executor; + + private long __executor; + private long __driver; +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/MesosNativeLibrary.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/MesosNativeLibrary.java b/myriad-commons/src/main/java/org/apache/mesos/MesosNativeLibrary.java new file mode 100644 index 0000000..4ce325c --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/MesosNativeLibrary.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos; + +public class MesosNativeLibrary { + /** + * Represent a 'libmesos' version with Major, Minor, and Patch versions. We + * use a class here to make it easier to do version compatibility checking. + * For example: + * <pre> + * {@code + * static Version BugFixVersion = new Version(0, 22, 1); + * public static void myFunction() { + * if (version().compareTo(BugFixVersion) >= 0) { + * // New behavior with bug fix. + * } else { + * // Old behavior for backwards compatibility. + * } + * } + * } + * </pre> + */ + public static class Version implements Comparable<Version> { + public Version(long major, long minor, long patch) { + if (major < 0) { + throw new IllegalArgumentException( + "Major version must not be negative"); + } + + if (minor < 0) { + throw new IllegalArgumentException( + "Minor version must not be negative"); + } + + if (patch < 0) { + throw new IllegalArgumentException( + "Patch version must not be negative"); + } + + this.major = major; + this.minor = minor; + this.patch = patch; + } + + public Version(long major, long minor) { + this(major, minor, 0); + } + + public Version(long major) { + this(major, 0, 0); + } + + public boolean equals(Version other) { + return other != null && + major == other.major && + minor == other.minor && + patch == other.patch; + } + + /** + * Compare this version to an 'other' one. The comparison is done + * lexicographically. This returns -1 if this version is 'lesser' than the + * other, 0 if they are equivalent, and 1 if this version is 'greater'. + */ + @Override + public int compareTo(Version other) { + if (other == null) { + throw new IllegalArgumentException("other Version must not be null"); + } + + if (major < other.major) { + return -1; + } else if (major > other.major) { + return 1; + } + + if (minor < other.minor) { + return -1; + } else if (minor > other.minor) { + return 1; + } + + if (patch < other.patch) { + return -1; + } else if (patch > other.patch) { + return 1; + } + + return 0; + } + + /** + * A helper that is easier to use than 'compareTo', this returns + * true if 'this' version is strictly 'less than', not 'less than + * or equal to' the 'other' version. + */ + public boolean before(Version other) { + return this.compareTo(other) < 0; + } + + /** + * A helper that is easier to use than 'compareTo', this returns + * true if 'this' version is strictly 'greater than', not 'greater + * than or equal to' the 'other' version. + */ + public boolean after(Version other) { + return this.compareTo(other) > 0; + } + + public final long major; + public final long minor; + public final long patch; + } + + /** + * Attempts to load the native library (if it was not previously loaded) + * from the given path. If the path is null 'java.library.path' is used to + * load the library. + */ + public static synchronized void load(String path) { + // Our JNI library will actually set 'loaded' to true once it is + // loaded, that way the library can get loaded by a user via + // 'System.load' in the event that they want to specify an + // absolute path and we won't try and reload the library ourselves + // (which would probably fail because 'java.library.path' might + // not be set). + if (loaded) { + return; + } + + // In some circumstances, such as when sandboxed class loaders are used, + // the current thread's context class loader will not be able to see + // MesosNativeLibrary (even when executing this code!). + // We therefore, temporarily swap the thread's context class loader with + // the class loader that loaded this class, for the duration of the native + // library load. + ClassLoader contextClassLoader = + Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader( + MesosNativeLibrary.class.getClassLoader()); + try { + if (path != null) { + System.load(path); + } else { + // TODO(tillt): Change the default fallback to JNI specific library + // once libmesos has been split. + System.loadLibrary("mesos"); + } + } catch (UnsatisfiedLinkError error) { + System.err.println("Failed to load native Mesos library from " + + (path != null ? path : System.getProperty("java.library.path"))); + throw error; + } finally { + Thread.currentThread().setContextClassLoader(contextClassLoader); + } + } + + public static void load() { + // Try to get the JNI specific library path from the environment. + String path = System.getenv("MESOS_NATIVE_JAVA_LIBRARY"); + + // As a fallback, use deprecated environment variable to extract that path. + if (path == null) { + path = System.getenv("MESOS_NATIVE_LIBRARY"); + if (path != null) { + System.out.println("Warning: MESOS_NATIVE_LIBRARY is deprecated, " + + "use MESOS_NATIVE_JAVA_LIBRARY instead. Future releases will " + + "not support JNI bindings via MESOS_NATIVE_LIBRARY."); + } + } + + load(path); + } + + /** + * Returns the version of the native loaded library, or throws a + * runtime exception if the library is not loaded. This was + * introduced in MESOS 0.22.1. Any version prior to that will be + * 0.0.0. This means you should not make version specific decision + * before the 0.22.1 version boundary. For example, if you found a + * bug that was fixed in 0.19.0, you will *not* be able to perform + * the following check correctly: + * + * if (version().before(new Version(0, 19, 0))) { + * ... + * } + * + * This predicate will return true for all versions up until 0.22.1. + */ + public static synchronized Version version() { + // Since we allow 'load' to be called with a parameter, we can not load on + // behalf of the user here. Instead, we throw an exception if the library + // has not been loaded. + if (!loaded) { + throw new RuntimeException("'libmesos' not loaded"); + } + + if (version == null) { + // Try to load the libmesos version identifier. If we get an + // 'UnsatisfiedLinkError' then this means we are loading a 'libmesos' with + // a version prior to 0.22.1, which is when the 'MAJOR', 'MINOR', and + // 'PATCH' version identifiers were introduced. + try { + version = _version(); + } catch (UnsatisfiedLinkError error) { + System.err.println( + "WARNING: using an old version of 'libmesos'" + + " without proper version information: " + error.getMessage()); + + // If we're using a version of 'libmesos' less than 0.22.1, then we set + // the version to 0.0.0. + version = new Version(0, 0, 0); + } + } + + return version; + } + + public static final String VERSION = "1.5.0"; + + private static Version version = null; + + private static boolean loaded = false; + + /** + * Native implementation of 'libmesos' version identifier function. + */ + private static native Version _version(); +} http://git-wip-us.apache.org/repos/asf/incubator-myriad/blob/b5b468b9/myriad-commons/src/main/java/org/apache/mesos/MesosSchedulerDriver.java ---------------------------------------------------------------------- diff --git a/myriad-commons/src/main/java/org/apache/mesos/MesosSchedulerDriver.java b/myriad-commons/src/main/java/org/apache/mesos/MesosSchedulerDriver.java new file mode 100644 index 0000000..4f61da2 --- /dev/null +++ b/myriad-commons/src/main/java/org/apache/mesos/MesosSchedulerDriver.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mesos; + +import java.util.Collection; + +import org.apache.mesos.Protos.*; + +/** + * Concrete implementation of a SchedulerDriver that connects a + * Scheduler with a Mesos master. The MesosSchedulerDriver is + * thread-safe. + * <p> + * Note that scheduler failover is supported in Mesos. After a + * scheduler is registered with Mesos it may failover (to a new + * process on the same machine or across multiple machines) by + * creating a new driver with the ID given to it in {@link + * Scheduler#registered}. + * <p> + * The driver is responsible for invoking the Scheduler callbacks as + * it communicates with the Mesos master. + * <p> + * Note that blocking on the MesosSchedulerDriver (e.g., via {@link + * #join}) doesn't affect the scheduler callbacks in anyway because + * they are handled by a different thread. + * <p> + * <p> + * Note that the driver uses GLOG to do its own logging. GLOG flags can + * be set via environment variables, prefixing the flag name with + * "GLOG_", e.g., "GLOG_v=1". For Mesos specific logging flags see + * src/logging/flags.hpp. Mesos flags can also be set via environment + * variables, prefixing the flag name with "MESOS_", e.g., + * "MESOS_QUIET=1". + * <p> + * See src/examples/java/TestFramework.java for an example of using + * the MesosSchedulerDriver. + */ +public class MesosSchedulerDriver implements SchedulerDriver { + static { + MesosNativeLibrary.load(); + } + + /** + * Creates a new driver for the specified scheduler. The master + * should be one of: + * <pre> + * {@code + * host:port + * zk://host1:port1,host2:port2,.../path + * zk://username:password@host1:port1,host2:port2,.../path + * file:///path/to/file (where file contains one of the above) + * } + * </pre> + * <p> + * The driver will attempt to "failover" if the specified + * FrameworkInfo includes a valid FrameworkID. + * <p> + * Any Mesos configuration options are read from environment + * variables, as well as any configuration files found through the + * environment variables. + * <p> + * + * @param scheduler The scheduler implementation which callbacks are invoked + * upon scheduler events. + * @param framework The frameworkInfo describing the current framework. + * @param master The address to the currently active Mesos master. + */ + // TODO(vinod): Deprecate this in favor the constructor that takes + // 'credential' as parameter. + public MesosSchedulerDriver(Scheduler scheduler, + FrameworkInfo framework, + String master) { + if (scheduler == null) { + throw new NullPointerException("Not expecting a null Scheduler"); + } + + if (framework == null) { + throw new NullPointerException("Not expecting a null FrameworkInfo"); + } + + if (master == null) { + throw new NullPointerException("Not expecting a null master"); + } + + this.scheduler = scheduler; + this.framework = framework; + this.master = master; + this.implicitAcknowledgements = true; + this.credential = null; + + initialize(); + } + + /** + * Same as the other constructors, except that it accepts the newly + * introduced 'credential' parameter. + * + * @param scheduler The scheduler implementation which callbacks are invoked + * upon scheduler events. + * @param framework The frameworkInfo describing the current framework. + * @param master The address to the currently active Mesos master. + * @param credential The credentials that will be used used to authenticate + * calls from this scheduler. + */ + public MesosSchedulerDriver(Scheduler scheduler, + FrameworkInfo framework, + String master, + Credential credential) { + + if (scheduler == null) { + throw new NullPointerException("Not expecting a null Scheduler"); + } + + if (framework == null) { + throw new NullPointerException("Not expecting a null FrameworkInfo"); + } + + if (master == null) { + throw new NullPointerException("Not expecting a null master"); + } + + if (credential == null) { + throw new NullPointerException("Not expecting a null credential"); + } + + this.scheduler = scheduler; + this.framework = framework; + this.master = master; + this.implicitAcknowledgements = true; + this.credential = credential; + + initialize(); + } + + /** + * Same as the other constructors, except that it accepts the newly + * introduced 'implicitAcknowledgements' parameter. + * + * @param scheduler The scheduler implementation which callbacks are invoked + * upon scheduler events. + * @param framework The frameworkInfo describing the current framework. + * @param master The address to the currently active Mesos master. + * @param implicitAcknowledgements Whether the driver should send + * acknowledgements on behalf of the scheduler. Setting this to + * false allows schedulers to perform their own acknowledgements, + * which enables asynchronous / batch processing of status updates. + */ + public MesosSchedulerDriver(Scheduler scheduler, + FrameworkInfo framework, + String master, + boolean implicitAcknowledgements) { + + if (scheduler == null) { + throw new NullPointerException("Not expecting a null Scheduler"); + } + + if (framework == null) { + throw new NullPointerException("Not expecting a null FrameworkInfo"); + } + + if (master == null) { + throw new NullPointerException("Not expecting a null master"); + } + + this.scheduler = scheduler; + this.framework = framework; + this.master = master; + this.implicitAcknowledgements = implicitAcknowledgements; + this.credential = null; + + initialize(); + } + + /** + * Same as the other constructors, except that it accepts the newly + * introduced 'implicitAcknowledgements' and 'credentials' parameters. + * + * @param scheduler The scheduler implementation which callbacks are invoked + * upon scheduler events. + * @param framework The frameworkInfo describing the current framework. + * @param master The address to the currently active Mesos master. + * @param implicitAcknowledgements Whether the driver should send + * acknowledgements on behalf of the scheduler. Setting this to + * false allows schedulers to perform their own acknowledgements, + * which enables asynchronous / batch processing of status updates. + * @param credential The credentials that will be used used to authenticate + * calls from this scheduler. + */ + public MesosSchedulerDriver(Scheduler scheduler, + FrameworkInfo framework, + String master, + boolean implicitAcknowledgements, + Credential credential) { + + if (scheduler == null) { + throw new NullPointerException("Not expecting a null Scheduler"); + } + + if (framework == null) { + throw new NullPointerException("Not expecting a null FrameworkInfo"); + } + + if (master == null) { + throw new NullPointerException("Not expecting a null master"); + } + + if (credential == null) { + throw new NullPointerException("Not expecting a null credential"); + } + + this.scheduler = scheduler; + this.framework = framework; + this.master = master; + this.implicitAcknowledgements = implicitAcknowledgements; + this.credential = credential; + + initialize(); + } + + public native Status start(); + + public native Status stop(boolean failover); + + public Status stop() { + return stop(false); + } + + public native Status abort(); + + public native Status join(); + + public Status run() { + Status status = start(); + return status != Status.DRIVER_RUNNING ? status : join(); + } + + public native Status requestResources(Collection<Request> requests); + + public Status launchTasks(OfferID offerId, + Collection<TaskInfo> tasks) { + return launchTasks(offerId, tasks, Filters.newBuilder().build()); + } + + public native Status launchTasks(OfferID offerId, + Collection<TaskInfo> tasks, + Filters filters); + public Status launchTasks(Collection<OfferID> offerIds, + Collection<TaskInfo> tasks) { + return launchTasks(offerIds, tasks, Filters.newBuilder().build()); + } + + public native Status launchTasks(Collection<OfferID> offerIds, + Collection<TaskInfo> tasks, + Filters filters); + + public native Status killTask(TaskID taskId); + + public native Status acceptOffers(Collection<OfferID> offerIds, + Collection<Offer.Operation> operations, + Filters filters); + + public Status declineOffer(OfferID offerId) { + return declineOffer(offerId, Filters.newBuilder().build()); + } + + public native Status declineOffer(OfferID offerId, Filters filters); + + public native Status reviveOffers(); + + public native Status suppressOffers(); + + public native Status acknowledgeStatusUpdate(TaskStatus status); + + public native Status sendFrameworkMessage(ExecutorID executorId, + SlaveID slaveId, + byte[] data); + + public native Status reconcileTasks(Collection<TaskStatus> statuses); + + protected native void initialize(); + protected native void finalize(); + + private final Scheduler scheduler; + private final FrameworkInfo framework; + private final String master; + private final boolean implicitAcknowledgements; + private final Credential credential; + + private long __scheduler; + private long __driver; +}