Repository: mesos Updated Branches: refs/heads/master 9567fb420 -> 74ceb051c
Added an example test for the V0/V1 Mesos java implementation. Review: https://reviews.apache.org/r/50254 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/74ceb051 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/74ceb051 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/74ceb051 Branch: refs/heads/master Commit: 74ceb051c454326ace02a7c670c0ea4d911519c2 Parents: 68c27d1 Author: Anand Mazumdar <an...@apache.org> Authored: Tue Jul 19 23:36:34 2016 -0700 Committer: Anand Mazumdar <an...@apache.org> Committed: Tue Aug 2 08:25:48 2016 -0700 ---------------------------------------------------------------------- configure.ac | 2 + src/Makefile.am | 8 +- src/examples/java/V1TestFramework.java | 386 ++++++++++++++++++++++++++++ src/examples/java/v1-test-framework.in | 39 +++ src/tests/examples_tests.cpp | 4 + src/tests/java_v0_framework_test.sh | 41 +++ src/tests/java_v1_framework_test.sh | 40 +++ 7 files changed, 518 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/configure.ac ---------------------------------------------------------------------- diff --git a/configure.ac b/configure.ac index d213690..f82c631 100644 --- a/configure.ac +++ b/configure.ac @@ -1069,6 +1069,8 @@ __EOF__ [chmod +x src/examples/java/test-multiple-executors-framework]) AC_CONFIG_FILES([src/examples/java/test-log], [chmod +x src/examples/java/test-log]) + AC_CONFIG_FILES([src/examples/java/v1-test-framework], + [chmod +x src/examples/java/v1-test-framework]) AC_CONFIG_FILES([src/java/mesos.pom]) AC_DEFINE([MESOS_HAS_JAVA]) http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/Makefile.am ---------------------------------------------------------------------- diff --git a/src/Makefile.am b/src/Makefile.am index b9cc040..30d8f72 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1532,7 +1532,8 @@ EXAMPLES_SOURCE = \ $(srcdir)/examples/java/TestExecutor.java \ $(srcdir)/examples/java/TestFramework.java \ $(srcdir)/examples/java/TestLog.java \ - $(srcdir)/examples/java/TestMultipleExecutorsFramework.java + $(srcdir)/examples/java/TestMultipleExecutorsFramework.java \ + $(srcdir)/examples/java/V1TestFramework.java EXTRA_DIST += $(EXAMPLES_SOURCE) @@ -2236,7 +2237,8 @@ EXAMPLESCRIPTSJAVA = \ examples/java/test-exception-framework \ examples/java/test-framework \ examples/java/test-log \ - examples/java/test-multiple-executors-framework + examples/java/test-multiple-executors-framework \ + examples/java/v1-test-framework check_SCRIPTS += $(EXAMPLESCRIPTSJAVA) mesos_tests_DEPENDENCIES += $(EXAMPLESCRIPTSJAVA) @@ -2270,6 +2272,8 @@ dist_check_SCRIPTS += \ tests/java_exception_test.sh \ tests/java_framework_test.sh \ tests/java_log_test.sh \ + tests/java_v0_framework_test.sh \ + tests/java_v1_framework_test.sh \ tests/no_executor_framework_test.sh \ tests/persistent_volume_framework_test.sh \ tests/python_framework_test.sh \ http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/examples/java/V1TestFramework.java ---------------------------------------------------------------------- diff --git a/src/examples/java/V1TestFramework.java b/src/examples/java/V1TestFramework.java new file mode 100644 index 0000000..a41edbc --- /dev/null +++ b/src/examples/java/V1TestFramework.java @@ -0,0 +1,386 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.File; +import java.io.IOException; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import java.util.concurrent.locks.*; + +import org.apache.mesos.v1.*; + +import org.apache.mesos.v1.Protos.*; + +import org.apache.mesos.v1.scheduler.JNIMesos; +import org.apache.mesos.v1.scheduler.Mesos; +import org.apache.mesos.v1.scheduler.Scheduler; +import org.apache.mesos.v1.scheduler.V0Mesos; + +import org.apache.mesos.v1.scheduler.Protos.Call; +import org.apache.mesos.v1.scheduler.Protos.Event; + +public class V1TestFramework { + static class TestScheduler implements Scheduler { + + public TestScheduler( + String master, + FrameworkInfo framework, + ExecutorInfo executor) { + this(master, framework, executor, 5); + } + + public TestScheduler( + String master, + FrameworkInfo framework, + ExecutorInfo executor, + int totalTasks) { + this.framework = framework; + this.executor = executor; + this.totalTasks = totalTasks; + this.state = State.DISCONNECTED; + } + + // TODO(anand): Synchronize on `state` instead. + @Override + public synchronized void connected(final Mesos mesos) { + System.out.println("Connected"); + + state = State.CONNECTED; + + retryTimer = new Timer(); + retryTimer.schedule(new TimerTask() { + @Override + public void run() { + doReliableRegistration(mesos); + } + }, 0, 1000); // Repeat every 1 second + } + + @Override + public synchronized void disconnected(Mesos mesos) { + System.out.println("Disconnected"); + + state = state.DISCONNECTED; + cancelRetryTimer(); + } + + @Override + public synchronized void received(Mesos mesos, Event event) { + switch (event.getType()) { + case SUBSCRIBED: { + frameworkId = event.getSubscribed().getFrameworkId(); + state = State.SUBSCRIBED; + + System.out.println("Subscribed with ID " + frameworkId); + break; + } + + case OFFERS: { + System.out.println("Received an OFFERS event"); + + offers(mesos, event.getOffers().getOffersList()); + break; + } + + case RESCIND: { + System.out.println("Received an RESCIND event"); + break; + } + + case UPDATE: { + System.out.println("Received an UPDATE event"); + + update(mesos, event.getUpdate().getStatus()); + break; + } + + case MESSAGE: { + System.out.println("Received a MESSAGE event"); + break; + } + + case FAILURE: { + System.out.println("Received a FAILURE event"); + break; + } + + case ERROR: { + System.out.println("Received an ERROR event"); + System.exit(1); + } + + case HEARTBEAT: { + // TODO(anand): Force reconnection with the master upon lack + // of heartbeats. + System.out.println("Received a HEARTBEAT event"); + break; + } + + case UNKNOWN: { + System.out.println("Received an UNKNOWN event"); + break; + } + } + } + + public synchronized void doReliableRegistration(Mesos mesos) { + if (state == State.SUBSCRIBED || state == State.DISCONNECTED) { + cancelRetryTimer(); + return; + } + + Call.Builder callBuilder = Call.newBuilder() + .setType(Call.Type.SUBSCRIBE) + .setSubscribe(Call.Subscribe.newBuilder() + .setFrameworkInfo(framework) + .build()); + + mesos.send(callBuilder.build()); + } + + private void cancelRetryTimer() { + // Cancel previously active timer (if one exists). + if (retryTimer != null) { + retryTimer.cancel(); + retryTimer.purge(); + } + + retryTimer = null; + } + + public void offers(Mesos mesos, List<Offer> offers) { + double CPUS_PER_TASK = 1; + double MEM_PER_TASK = 128; + + for (Offer offer : offers) { + Offer.Operation.Launch.Builder launch = + Offer.Operation.Launch.newBuilder(); + + double offerCpus = 0; + double offerMem = 0; + for (Resource resource : offer.getResourcesList()) { + if (resource.getName().equals("cpus")) { + offerCpus += resource.getScalar().getValue(); + } else if (resource.getName().equals("mem")) { + offerMem += resource.getScalar().getValue(); + } + } + + System.out.println( + "Received offer " + offer.getId().getValue() + " with cpus: " + + offerCpus + " and mem: " + offerMem); + + double remainingCpus = offerCpus; + double remainingMem = offerMem; + while (launchedTasks < totalTasks && + remainingCpus >= CPUS_PER_TASK && + remainingMem >= MEM_PER_TASK) { + TaskID taskId = TaskID.newBuilder() + .setValue(Integer.toString(launchedTasks++)).build(); + + System.out.println("Launching task " + taskId.getValue() + + " using offer " + offer.getId().getValue()); + + TaskInfo task = TaskInfo.newBuilder() + .setName("task " + taskId.getValue()) + .setTaskId(taskId) + .setAgentId(offer.getAgentId()) + .addResources(Resource.newBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder() + .setValue(CPUS_PER_TASK) + .build())) + .addResources(Resource.newBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder() + .setValue(MEM_PER_TASK) + .build())) + .setExecutor(ExecutorInfo.newBuilder(executor)) + .build(); + + launch.addTaskInfos(TaskInfo.newBuilder(task)); + + remainingCpus -= CPUS_PER_TASK; + remainingMem -= MEM_PER_TASK; + } + + mesos.send(Call.newBuilder() + .setType(Call.Type.ACCEPT) + .setFrameworkId(frameworkId) + .setAccept(Call.Accept.newBuilder() + .addOfferIds(offer.getId()) + .addOperations(Offer.Operation.newBuilder() + .setType(Offer.Operation.Type.LAUNCH) + .setLaunch(launch) + .build()) + .setFilters(Filters.newBuilder() + .setRefuseSeconds(1) + .build())) + .build()); + } + } + + public void update(Mesos mesos, TaskStatus status) { + System.out.println( + "Status update: task " + status.getTaskId().getValue() + + " is in state " + status.getState().getValueDescriptor().getName()); + + if (status.getState() == TaskState.TASK_FINISHED) { + finishedTasks++; + System.out.println("Finished tasks: " + finishedTasks); + if (finishedTasks == totalTasks) { + lock.lock(); + try { + finished = true; + finishedCondtion.signal(); + } finally { + lock.unlock(); + } + } + } + + if (status.getState() == TaskState.TASK_LOST || + status.getState() == TaskState.TASK_KILLED || + status.getState() == TaskState.TASK_FAILED) { + System.err.println( + "Aborting because task " + status.getTaskId().getValue() + + " is in unexpected state " + + status.getState().getValueDescriptor().getName() + + " with reason '" + + status.getReason().getValueDescriptor().getName() + "'" + + " from source '" + + status.getSource().getValueDescriptor().getName() + "'" + + " with message '" + status.getMessage() + "'"); + + System.exit(1); + } + + mesos.send(Call.newBuilder() + .setType(Call.Type.ACKNOWLEDGE) + .setFrameworkId(frameworkId) + .setAcknowledge(Call.Acknowledge.newBuilder() + .setAgentId(status.getAgentId()) + .setTaskId(status.getTaskId()) + .setUuid(status.getUuid()) + .build()) + .build()); + } + + private enum State { + DISCONNECTED, + CONNECTED, + SUBSCRIBED + } + + private FrameworkInfo framework; + private FrameworkID frameworkId; + private final ExecutorInfo executor; + private final int totalTasks; + private int launchedTasks = 0; + private int finishedTasks = 0; + private State state; + private Timer retryTimer = null; + } + + private static void usage() { + String name = V1TestFramework.class.getName(); + System.err.println("Usage: " + name + " master version{0,1}"); + } + + public static void main(String[] args) throws Exception { + if (args.length < 2 || args.length > 3) { + usage(); + System.exit(1); + } + + int version = Integer.parseInt(args[1]); + if (version != 0 && version != 1) { + usage(); + System.exit(1); + } + + String uri = new File("./test-executor").getCanonicalPath(); + + ExecutorInfo executor = ExecutorInfo.newBuilder() + .setExecutorId(ExecutorID.newBuilder().setValue("default")) + .setCommand(CommandInfo.newBuilder().setValue(uri)) + .setName("Test Executor (Java)") + .setSource("java_test") + .build(); + + FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder() + .setUser(System.getProperty("user.name", "default-user")) + .setName("V" + version + " Test Framework (Java)"); + + Credential.Builder credentialBuilder = null; + + if (System.getenv("DEFAULT_PRINCIPAL") != null) { + frameworkBuilder.setPrincipal(System.getenv("DEFAULT_PRINCIPAL")); + + if (System.getenv("DEFAULT_SECRET") != null) { + credentialBuilder = Credential.newBuilder() + .setPrincipal(System.getenv("DEFAULT_PRINCIPAL")) + .setSecret(System.getenv("DEFAULT_SECRET")); + } + } + + Scheduler scheduler = args.length == 2 + ? new TestScheduler(args[0], frameworkBuilder.build(), executor) + : new TestScheduler(args[0], + frameworkBuilder.build(), + executor, + Integer.parseInt(args[2])); + + Mesos mesos; + if (credentialBuilder != null) { + mesos = version == 1 + ? new JNIMesos(scheduler, args[0], credentialBuilder.build()) + : new V0Mesos(scheduler, + frameworkBuilder.build(), + args[0], + credentialBuilder.build()); + } else { + mesos = version == 1 + ? new JNIMesos(scheduler, args[0]) + : new V0Mesos(scheduler, frameworkBuilder.build(), args[0]); + } + + lock.lock(); + try { + while (!finished) { + finishedCondtion.await(); + } + } finally { + lock.unlock(); + } + + System.exit(0); + } + + static boolean finished = false; + final static Lock lock = new ReentrantLock(); + final static Condition finishedCondtion = lock.newCondition(); +} http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/examples/java/v1-test-framework.in ---------------------------------------------------------------------- diff --git a/src/examples/java/v1-test-framework.in b/src/examples/java/v1-test-framework.in new file mode 100644 index 0000000..6e5e581 --- /dev/null +++ b/src/examples/java/v1-test-framework.in @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +# This script uses MESOS_SOURCE_DIR and MESOS_BUILD_DIR which come +# from configuration substitutions. +MESOS_SOURCE_DIR=@abs_top_srcdir@ +MESOS_BUILD_DIR=@abs_top_builddir@ + +# Locate Java from environment or use configure discovered location. +JAVA_HOME=${JAVA_HOME-@JAVA_HOME@} +JAVA=${JAVA-${JAVA_HOME}/bin/java} + +# Use colors for errors. +. ${MESOS_SOURCE_DIR}/support/colors.sh + +PROTOBUF_JAR=@PROTOBUF_JAR@ + +test ! -e ${PROTOBUF_JAR} && \ + echo "${RED}Failed to find ${PROTOBUF_JAR}${NORMAL}" && \ + exit 1 + +MESOS_JAR=${MESOS_BUILD_DIR}/src/java/target/mesos-@PACKAGE_VERSION@.jar + +test ! -e ${MESOS_JAR} && \ + echo "${RED}Failed to find ${MESOS_JAR}${NORMAL}" && \ + exit 1 + +EXAMPLES_JAR=${MESOS_BUILD_DIR}/src/examples.jar + +test ! -e ${EXAMPLES_JAR} && \ + echo "${RED}Failed to find ${EXAMPLES_JAR}${NORMAL}" && \ + exit 1 + +# Need to run in the directory containing this script so that the +# framework is able to find the executor. +cd `dirname ${0}` + +exec ${JAVA} -cp ${PROTOBUF_JAR}:${MESOS_JAR}:${EXAMPLES_JAR} \ + -Djava.library.path=${MESOS_BUILD_DIR}/src/.libs \ + V1TestFramework "${@}" http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/examples_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/examples_tests.cpp b/src/tests/examples_tests.cpp index cac5304..52fac33 100644 --- a/src/tests/examples_tests.cpp +++ b/src/tests/examples_tests.cpp @@ -38,6 +38,10 @@ TEST_SCRIPT(ExamplesTest, DiskFullFramework, TEST_SCRIPT(ExamplesTest, JavaFramework, "java_framework_test.sh") TEST_SCRIPT(ExamplesTest, JavaException, "java_exception_test.sh") TEST_SCRIPT(ExamplesTest, JavaLog, "java_log_test.sh") + +// TODO(anand): Parameterize these tests on version. +TEST_SCRIPT(ExamplesTest, V0JavaFramework, "java_v0_framework_test.sh") +TEST_SCRIPT(ExamplesTest, V1JavaFramework, "java_v1_framework_test.sh") #endif #ifdef MESOS_HAS_PYTHON http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/java_v0_framework_test.sh ---------------------------------------------------------------------- diff --git a/src/tests/java_v0_framework_test.sh b/src/tests/java_v0_framework_test.sh new file mode 100755 index 0000000..12daae4 --- /dev/null +++ b/src/tests/java_v0_framework_test.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash + +# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment. + +env | grep MESOS_SOURCE_DIR >/dev/null + +test $? != 0 && \ + echo "Failed to find MESOS_SOURCE_DIR in environment" && \ + exit 1 + +env | grep MESOS_BUILD_DIR >/dev/null + +test $? != 0 && \ + echo "Failed to find MESOS_BUILD_DIR in environment" && \ + exit 1 + +source ${MESOS_SOURCE_DIR}/support/atexit.sh + +MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX` + +atexit "rm -rf ${MESOS_WORK_DIR}" +export MESOS_WORK_DIR=${MESOS_WORK_DIR} + +# Lower the authentication timeout to speed up the test (the master +# may drop the authentication message while it is recovering). +export MESOS_AUTHENTICATION_TIMEOUT=200ms + +# Set local Mesos runner to use 3 slaves. +export MESOS_NUM_SLAVES=3 + +# Set resources for the slave. +export MESOS_RESOURCES="cpus:2;mem:10240" + +# Set isolation for the slave. +export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem" + +# Set launcher for the slave. +export MESOS_LAUNCHER="posix" + +# Check that the Java test framework executes without crashing (returns 0). +exec $MESOS_BUILD_DIR/src/examples/java/v1-test-framework local 0 http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/java_v1_framework_test.sh ---------------------------------------------------------------------- diff --git a/src/tests/java_v1_framework_test.sh b/src/tests/java_v1_framework_test.sh new file mode 100755 index 0000000..c49e11c --- /dev/null +++ b/src/tests/java_v1_framework_test.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment. + +env | grep MESOS_SOURCE_DIR >/dev/null + +test $? != 0 && \ + echo "Failed to find MESOS_SOURCE_DIR in environment" && \ + exit 1 + +env | grep MESOS_BUILD_DIR >/dev/null + +test $? != 0 && \ + echo "Failed to find MESOS_BUILD_DIR in environment" && \ + exit 1 + +source ${MESOS_SOURCE_DIR}/support/atexit.sh + +MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX` + +atexit "rm -rf ${MESOS_WORK_DIR}" +export MESOS_WORK_DIR=${MESOS_WORK_DIR} + +# Set the connection delay to 0 to speed up the tests. +export MESOS_CONNECTION_DELAY_MAX=0ms; + +# Set local Mesos runner to use 3 slaves. +export MESOS_NUM_SLAVES=3 + +# Set resources for the slave. +export MESOS_RESOURCES="cpus:2;mem:10240" + +# Set isolation for the slave. +export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem" + +# Set launcher for the slave. +export MESOS_LAUNCHER="posix" + +# Check that the Java test framework executes without crashing (returns 0). +exec $MESOS_BUILD_DIR/src/examples/java/v1-test-framework local 1