Added Java binding for the new acceptOffers API. Review: https://reviews.apache.org/r/31873
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6a6cb4b5 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6a6cb4b5 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6a6cb4b5 Branch: refs/heads/master Commit: 6a6cb4b58874f0f4f934cd52024a3390dea03ed2 Parents: 4f455c2 Author: Jie Yu <[email protected]> Authored: Wed Mar 11 18:16:30 2015 -0700 Committer: Jie Yu <[email protected]> Committed: Wed Mar 11 18:21:53 2015 -0700 ---------------------------------------------------------------------- src/examples/java/TestFramework.java | 22 ++++++- src/java/jni/construct.cpp | 21 ++++++ .../org_apache_mesos_MesosSchedulerDriver.cpp | 67 ++++++++++++++++++++ .../org/apache/mesos/MesosSchedulerDriver.java | 4 ++ .../src/org/apache/mesos/SchedulerDriver.java | 24 +++++++ 5 files changed, 135 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/examples/java/TestFramework.java ---------------------------------------------------------------------- diff --git a/src/examples/java/TestFramework.java b/src/examples/java/TestFramework.java index ffc90b5..9e95369 100644 --- a/src/examples/java/TestFramework.java +++ b/src/examples/java/TestFramework.java @@ -64,7 +64,7 @@ public class TestFramework { double MEM_PER_TASK = 128; for (Offer offer : offers) { - List<TaskInfo> tasks = new ArrayList<TaskInfo>(); + Offer.Operation.Launch.Builder launch = Offer.Operation.Launch.newBuilder(); double offerCpus = 0; double offerMem = 0; for (Resource resource : offer.getResourcesList()) { @@ -105,13 +105,29 @@ public class TestFramework { .setExecutor(ExecutorInfo.newBuilder(executor)) .build(); - tasks.add(task); + launch.addTaskInfos(TaskInfo.newBuilder(task)); remainingCpus -= CPUS_PER_TASK; remainingMem -= MEM_PER_TASK; } + + // NOTE: We use the new API `acceptOffers` here to launch tasks. The + // 'launchTasks' API will be deprecated. + List<OfferID> offerIds = new ArrayList<OfferID>(); + offerIds.add(offer.getId()); + + List<Offer.Operation> operations = new ArrayList<Offer.Operation>(); + + Offer.Operation operation = Offer.Operation.newBuilder() + .setType(Offer.Operation.Type.LAUNCH) + .setLaunch(launch) + .build(); + + operations.add(operation); + Filters filters = Filters.newBuilder().setRefuseSeconds(1).build(); - driver.launchTasks(offer.getId(), tasks, filters); + + driver.acceptOffers(offerIds, operations, filters); } } http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/jni/construct.cpp ---------------------------------------------------------------------- diff --git a/src/java/jni/construct.cpp b/src/java/jni/construct.cpp index e54c11e..1343208 100644 --- a/src/java/jni/construct.cpp +++ b/src/java/jni/construct.cpp @@ -380,3 +380,24 @@ Request construct(JNIEnv* env, jobject jobj) return request; } + + +template <> +Offer::Operation construct(JNIEnv* env, jobject jobj) +{ + jclass clazz = env->GetObjectClass(jobj); + + // byte[] data = obj.toByteArray(); + jmethodID toByteArray = env->GetMethodID(clazz, "toByteArray", "()[B"); + + jbyteArray jdata = (jbyteArray) env->CallObjectMethod(jobj, toByteArray); + + jbyte* data = env->GetByteArrayElements(jdata, NULL); + jsize length = env->GetArrayLength(jdata); + + const Offer::Operation& operation = parse<Offer::Operation>(data, length); + + env->ReleaseByteArrayElements(jdata, data, 0); + + return operation; +} http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp ---------------------------------------------------------------------- diff --git a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp index 4f0dad7..a89ebed 100644 --- a/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp +++ b/src/java/jni/org_apache_mesos_MesosSchedulerDriver.cpp @@ -874,6 +874,73 @@ JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_launchTasks /* * Class: org_apache_mesos_MesosSchedulerDriver + * Method: acceptOffers + * Signature: (Ljava/util/Collection;Ljava/util/Collection;Lorg/apache/mesos/Protos$Filters;)Lorg/apache/mesos/Protos/Status; + */ +JNIEXPORT jobject JNICALL Java_org_apache_mesos_MesosSchedulerDriver_acceptOffers__Ljava_util_Collection_2Ljava_util_Collection_2Lorg_apache_mesos_Protos_00024Filters_2 + (JNIEnv* env, jobject thiz, jobject jofferIds, jobject joperations, jobject jfilters) +{ + // Construct C++ OfferIDs from each Java OfferIDs. + vector<OfferID> offers; + jclass clazz = env->GetObjectClass(jofferIds); + + // Iterator iterator = offerIds.iterator(); + jmethodID iterator = + env->GetMethodID(clazz, "iterator", "()Ljava/util/Iterator;"); + jobject jiterator = env->CallObjectMethod(jofferIds, iterator); + + clazz = env->GetObjectClass(jiterator); + + // while (iterator.hasNext()) { + jmethodID hasNext = env->GetMethodID(clazz, "hasNext", "()Z"); + + jmethodID next = env->GetMethodID(clazz, "next", "()Ljava/lang/Object;"); + + while (env->CallBooleanMethod(jiterator, hasNext)) { + // Object offerId = iterator.next(); + jobject jofferId = env->CallObjectMethod(jiterator, next); + offers.push_back(construct<OfferID>(env, jofferId)); + } + + // Construct C++ Offer::Operations from each Java Offer.Operations. + vector<Offer::Operation> operations; + clazz = env->GetObjectClass(joperations); + + // Iterator iterator = operations.iterator(); + iterator = env->GetMethodID(clazz, "iterator", "()Ljava/util/Iterator;"); + jiterator = env->CallObjectMethod(joperations, iterator); + + clazz = env->GetObjectClass(jiterator); + + // while (iterator.hasNext()) { + hasNext = env->GetMethodID(clazz, "hasNext", "()Z"); + + next = env->GetMethodID(clazz, "next", "()Ljava/lang/Object;"); + + while (env->CallBooleanMethod(jiterator, hasNext)) { + // Object operation = iterator.next(); + jobject joperation = env->CallObjectMethod(jiterator, next); + operations.push_back(construct<Offer::Operation>(env, joperation)); + } + + // Construct C++ Filters from the Java Filters. + const Filters& filters = construct<Filters>(env, jfilters); + + // Now invoke the underlying driver. + clazz = env->GetObjectClass(thiz); + + jfieldID __driver = env->GetFieldID(clazz, "__driver", "J"); + MesosSchedulerDriver* driver = + (MesosSchedulerDriver*) env->GetLongField(thiz, __driver); + + Status status = driver->acceptOffers(offers, operations, filters); + + return convert<Status>(env, status); +} + + +/* + * Class: org_apache_mesos_MesosSchedulerDriver * Method: declineOffer * Signature: (Lorg/apache/mesos/Protos/OfferID;Lorg/apache/mesos/Protos/Filters;)Lorg/apache/mesos/Protos/Status; */ http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/src/org/apache/mesos/MesosSchedulerDriver.java ---------------------------------------------------------------------- diff --git a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java index a1055a5..b9b2ea8 100644 --- a/src/java/src/org/apache/mesos/MesosSchedulerDriver.java +++ b/src/java/src/org/apache/mesos/MesosSchedulerDriver.java @@ -276,6 +276,10 @@ public class MesosSchedulerDriver implements SchedulerDriver { public native Status killTask(TaskID taskId); + public native Status acceptOffers(Collection<OfferID> offerIds, + Collection<Offer.Operation> operations, + Filters filters); + public Status declineOffer(OfferID offerId) { return declineOffer(offerId, Filters.newBuilder().build()); } http://git-wip-us.apache.org/repos/asf/mesos/blob/6a6cb4b5/src/java/src/org/apache/mesos/SchedulerDriver.java ---------------------------------------------------------------------- diff --git a/src/java/src/org/apache/mesos/SchedulerDriver.java b/src/java/src/org/apache/mesos/SchedulerDriver.java index d5b100a..183eec8 100644 --- a/src/java/src/org/apache/mesos/SchedulerDriver.java +++ b/src/java/src/org/apache/mesos/SchedulerDriver.java @@ -186,6 +186,30 @@ public interface SchedulerDriver { Status killTask(TaskID taskId); /** + * Accepts the given offers and performs a sequence of operations on + * those accepted offers. See Offer.Operation in mesos.proto for the + * set of available operations. Available resources are aggregated + * when multiple offers are provided. Note that all offers must + * belong to the same slave. Any unused resources will be considered + * declined. The specified filters are applied on all unused + * resources (see mesos.proto for a description of Filters). + * + * @param offerIds The collection of offer IDs. + * @param operations The collection of offer operations to perform. + * @param filters The filters to set for any remaining resources. + * + * @return The state of the driver after the call. + * + * @see OfferID + * @see Offer.Operation + * @see Filters + * @see Status + */ + Status acceptOffers(Collection<OfferID> offerIds, + Collection<Offer.Operation> operations, + Filters filters); + + /** * Declines an offer in its entirety and applies the specified * filters on the resources (see mesos.proto for a description of * Filters). Note that this can be done at any time, it is not
