Repository: mesos
Updated Branches:
  refs/heads/master 9567fb420 -> 74ceb051c


Added an example test for the V0/V1 Mesos java implementation.

Review: https://reviews.apache.org/r/50254


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/74ceb051
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/74ceb051
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/74ceb051

Branch: refs/heads/master
Commit: 74ceb051c454326ace02a7c670c0ea4d911519c2
Parents: 68c27d1
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Jul 19 23:36:34 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Aug 2 08:25:48 2016 -0700

----------------------------------------------------------------------
 configure.ac                           |   2 +
 src/Makefile.am                        |   8 +-
 src/examples/java/V1TestFramework.java | 386 ++++++++++++++++++++++++++++
 src/examples/java/v1-test-framework.in |  39 +++
 src/tests/examples_tests.cpp           |   4 +
 src/tests/java_v0_framework_test.sh    |  41 +++
 src/tests/java_v1_framework_test.sh    |  40 +++
 7 files changed, 518 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index d213690..f82c631 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1069,6 +1069,8 @@ __EOF__
                   [chmod +x 
src/examples/java/test-multiple-executors-framework])
   AC_CONFIG_FILES([src/examples/java/test-log],
                   [chmod +x src/examples/java/test-log])
+  AC_CONFIG_FILES([src/examples/java/v1-test-framework],
+                  [chmod +x src/examples/java/v1-test-framework])
   AC_CONFIG_FILES([src/java/mesos.pom])
 
   AC_DEFINE([MESOS_HAS_JAVA])

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index b9cc040..30d8f72 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1532,7 +1532,8 @@ EXAMPLES_SOURCE =                                         
        \
   $(srcdir)/examples/java/TestExecutor.java                            \
   $(srcdir)/examples/java/TestFramework.java                           \
   $(srcdir)/examples/java/TestLog.java                                 \
-  $(srcdir)/examples/java/TestMultipleExecutorsFramework.java
+  $(srcdir)/examples/java/TestMultipleExecutorsFramework.java          \
+  $(srcdir)/examples/java/V1TestFramework.java
 EXTRA_DIST += $(EXAMPLES_SOURCE)
 
 
@@ -2236,7 +2237,8 @@ EXAMPLESCRIPTSJAVA =                                      
        \
   examples/java/test-exception-framework                       \
   examples/java/test-framework                                 \
   examples/java/test-log                                       \
-  examples/java/test-multiple-executors-framework
+  examples/java/test-multiple-executors-framework              \
+  examples/java/v1-test-framework
 
 check_SCRIPTS += $(EXAMPLESCRIPTSJAVA)
 mesos_tests_DEPENDENCIES += $(EXAMPLESCRIPTSJAVA)
@@ -2270,6 +2272,8 @@ dist_check_SCRIPTS +=                                     
        \
   tests/java_exception_test.sh                                 \
   tests/java_framework_test.sh                                 \
   tests/java_log_test.sh                                       \
+  tests/java_v0_framework_test.sh                              \
+  tests/java_v1_framework_test.sh                              \
   tests/no_executor_framework_test.sh                          \
   tests/persistent_volume_framework_test.sh                    \
   tests/python_framework_test.sh                               \

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/examples/java/V1TestFramework.java
----------------------------------------------------------------------
diff --git a/src/examples/java/V1TestFramework.java 
b/src/examples/java/V1TestFramework.java
new file mode 100644
index 0000000..a41edbc
--- /dev/null
+++ b/src/examples/java/V1TestFramework.java
@@ -0,0 +1,386 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import java.util.concurrent.locks.*;
+
+import org.apache.mesos.v1.*;
+
+import org.apache.mesos.v1.Protos.*;
+
+import org.apache.mesos.v1.scheduler.JNIMesos;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Scheduler;
+import org.apache.mesos.v1.scheduler.V0Mesos;
+
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Protos.Event;
+
+public class V1TestFramework {
+  static class TestScheduler implements Scheduler {
+
+    public TestScheduler(
+        String master,
+        FrameworkInfo framework,
+        ExecutorInfo executor) {
+      this(master, framework, executor, 5);
+    }
+
+    public TestScheduler(
+        String master,
+        FrameworkInfo framework,
+        ExecutorInfo executor,
+        int totalTasks) {
+      this.framework = framework;
+      this.executor = executor;
+      this.totalTasks = totalTasks;
+      this.state = State.DISCONNECTED;
+    }
+
+    // TODO(anand): Synchronize on `state` instead.
+    @Override
+    public synchronized void connected(final Mesos mesos) {
+      System.out.println("Connected");
+
+      state = State.CONNECTED;
+
+      retryTimer = new Timer();
+      retryTimer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          doReliableRegistration(mesos);
+        }
+      }, 0, 1000); // Repeat every 1 second
+    }
+
+    @Override
+    public synchronized void disconnected(Mesos mesos) {
+      System.out.println("Disconnected");
+
+      state = state.DISCONNECTED;
+      cancelRetryTimer();
+    }
+
+    @Override
+    public synchronized void received(Mesos mesos, Event event) {
+      switch (event.getType()) {
+        case SUBSCRIBED: {
+          frameworkId = event.getSubscribed().getFrameworkId();
+          state = State.SUBSCRIBED;
+
+          System.out.println("Subscribed with ID " + frameworkId);
+          break;
+        }
+
+        case OFFERS: {
+          System.out.println("Received an OFFERS event");
+
+          offers(mesos, event.getOffers().getOffersList());
+          break;
+        }
+
+        case RESCIND: {
+          System.out.println("Received an RESCIND event");
+          break;
+        }
+
+        case UPDATE: {
+          System.out.println("Received an UPDATE event");
+
+          update(mesos, event.getUpdate().getStatus());
+          break;
+        }
+
+        case MESSAGE: {
+          System.out.println("Received a MESSAGE event");
+          break;
+        }
+
+        case FAILURE: {
+          System.out.println("Received a FAILURE event");
+          break;
+        }
+
+        case ERROR: {
+          System.out.println("Received an ERROR event");
+          System.exit(1);
+        }
+
+        case HEARTBEAT: {
+          // TODO(anand): Force reconnection with the master upon lack
+          // of heartbeats.
+          System.out.println("Received a HEARTBEAT event");
+          break;
+        }
+
+        case UNKNOWN: {
+          System.out.println("Received an UNKNOWN event");
+          break;
+        }
+      }
+    }
+
+    public synchronized void doReliableRegistration(Mesos mesos) {
+      if (state == State.SUBSCRIBED || state == State.DISCONNECTED) {
+        cancelRetryTimer();
+        return;
+      }
+
+      Call.Builder callBuilder = Call.newBuilder()
+          .setType(Call.Type.SUBSCRIBE)
+          .setSubscribe(Call.Subscribe.newBuilder()
+            .setFrameworkInfo(framework)
+            .build());
+
+      mesos.send(callBuilder.build());
+    }
+
+    private void cancelRetryTimer() {
+      // Cancel previously active timer (if one exists).
+      if (retryTimer != null) {
+        retryTimer.cancel();
+        retryTimer.purge();
+      }
+
+      retryTimer = null;
+    }
+
+    public void offers(Mesos mesos, List<Offer> offers) {
+      double CPUS_PER_TASK = 1;
+      double MEM_PER_TASK = 128;
+
+      for (Offer offer : offers) {
+        Offer.Operation.Launch.Builder launch =
+          Offer.Operation.Launch.newBuilder();
+
+        double offerCpus = 0;
+        double offerMem = 0;
+        for (Resource resource : offer.getResourcesList()) {
+          if (resource.getName().equals("cpus")) {
+            offerCpus += resource.getScalar().getValue();
+          } else if (resource.getName().equals("mem")) {
+            offerMem += resource.getScalar().getValue();
+          }
+        }
+
+        System.out.println(
+            "Received offer " + offer.getId().getValue() + " with cpus: " +
+            offerCpus + " and mem: " + offerMem);
+
+        double remainingCpus = offerCpus;
+        double remainingMem = offerMem;
+        while (launchedTasks < totalTasks &&
+               remainingCpus >= CPUS_PER_TASK &&
+               remainingMem >= MEM_PER_TASK) {
+          TaskID taskId = TaskID.newBuilder()
+            .setValue(Integer.toString(launchedTasks++)).build();
+
+          System.out.println("Launching task " + taskId.getValue() +
+                             " using offer " + offer.getId().getValue());
+
+          TaskInfo task = TaskInfo.newBuilder()
+            .setName("task " + taskId.getValue())
+            .setTaskId(taskId)
+            .setAgentId(offer.getAgentId())
+            .addResources(Resource.newBuilder()
+                          .setName("cpus")
+                          .setType(Value.Type.SCALAR)
+                          .setScalar(Value.Scalar.newBuilder()
+                            .setValue(CPUS_PER_TASK)
+                            .build()))
+            .addResources(Resource.newBuilder()
+                          .setName("mem")
+                          .setType(Value.Type.SCALAR)
+                          .setScalar(Value.Scalar.newBuilder()
+                            .setValue(MEM_PER_TASK)
+                            .build()))
+            .setExecutor(ExecutorInfo.newBuilder(executor))
+            .build();
+
+          launch.addTaskInfos(TaskInfo.newBuilder(task));
+
+          remainingCpus -= CPUS_PER_TASK;
+          remainingMem -= MEM_PER_TASK;
+        }
+
+        mesos.send(Call.newBuilder()
+          .setType(Call.Type.ACCEPT)
+          .setFrameworkId(frameworkId)
+          .setAccept(Call.Accept.newBuilder()
+            .addOfferIds(offer.getId())
+            .addOperations(Offer.Operation.newBuilder()
+              .setType(Offer.Operation.Type.LAUNCH)
+              .setLaunch(launch)
+              .build())
+            .setFilters(Filters.newBuilder()
+              .setRefuseSeconds(1)
+              .build()))
+          .build());
+      }
+    }
+
+    public void update(Mesos mesos, TaskStatus status) {
+      System.out.println(
+          "Status update: task " + status.getTaskId().getValue() +
+          " is in state " + status.getState().getValueDescriptor().getName());
+
+      if (status.getState() == TaskState.TASK_FINISHED) {
+        finishedTasks++;
+        System.out.println("Finished tasks: " + finishedTasks);
+        if (finishedTasks == totalTasks) {
+          lock.lock();
+          try {
+            finished = true;
+            finishedCondtion.signal();
+          } finally {
+            lock.unlock();
+          }
+        }
+      }
+
+      if (status.getState() == TaskState.TASK_LOST ||
+          status.getState() == TaskState.TASK_KILLED ||
+          status.getState() == TaskState.TASK_FAILED) {
+        System.err.println(
+            "Aborting because task " + status.getTaskId().getValue() +
+            " is in unexpected state " +
+            status.getState().getValueDescriptor().getName() +
+            " with reason '" +
+            status.getReason().getValueDescriptor().getName() + "'" +
+            " from source '" +
+            status.getSource().getValueDescriptor().getName() + "'" +
+            " with message '" + status.getMessage() + "'");
+
+        System.exit(1);
+      }
+
+      mesos.send(Call.newBuilder()
+        .setType(Call.Type.ACKNOWLEDGE)
+        .setFrameworkId(frameworkId)
+        .setAcknowledge(Call.Acknowledge.newBuilder()
+          .setAgentId(status.getAgentId())
+          .setTaskId(status.getTaskId())
+          .setUuid(status.getUuid())
+          .build())
+        .build());
+    }
+
+    private enum State {
+      DISCONNECTED,
+      CONNECTED,
+      SUBSCRIBED
+    }
+
+    private FrameworkInfo framework;
+    private FrameworkID frameworkId;
+    private final ExecutorInfo executor;
+    private final int totalTasks;
+    private int launchedTasks = 0;
+    private int finishedTasks = 0;
+    private State state;
+    private Timer retryTimer = null;
+  }
+
+  private static void usage() {
+    String name = V1TestFramework.class.getName();
+    System.err.println("Usage: " + name + " master version{0,1}");
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2 || args.length > 3) {
+      usage();
+      System.exit(1);
+    }
+
+    int version = Integer.parseInt(args[1]);
+    if (version != 0 && version != 1) {
+      usage();
+      System.exit(1);
+    }
+
+    String uri = new File("./test-executor").getCanonicalPath();
+
+    ExecutorInfo executor = ExecutorInfo.newBuilder()
+      .setExecutorId(ExecutorID.newBuilder().setValue("default"))
+      .setCommand(CommandInfo.newBuilder().setValue(uri))
+      .setName("Test Executor (Java)")
+      .setSource("java_test")
+      .build();
+
+    FrameworkInfo.Builder frameworkBuilder = FrameworkInfo.newBuilder()
+        .setUser(System.getProperty("user.name", "default-user"))
+        .setName("V" + version + " Test Framework (Java)");
+
+    Credential.Builder credentialBuilder = null;
+
+    if (System.getenv("DEFAULT_PRINCIPAL") != null) {
+      frameworkBuilder.setPrincipal(System.getenv("DEFAULT_PRINCIPAL"));
+
+      if (System.getenv("DEFAULT_SECRET") != null) {
+        credentialBuilder = Credential.newBuilder()
+          .setPrincipal(System.getenv("DEFAULT_PRINCIPAL"))
+          .setSecret(System.getenv("DEFAULT_SECRET"));
+      }
+    }
+
+    Scheduler scheduler = args.length == 2
+      ? new TestScheduler(args[0], frameworkBuilder.build(), executor)
+      : new TestScheduler(args[0],
+                          frameworkBuilder.build(),
+                          executor,
+                          Integer.parseInt(args[2]));
+
+    Mesos mesos;
+    if (credentialBuilder != null) {
+      mesos = version == 1
+                ? new JNIMesos(scheduler, args[0], credentialBuilder.build())
+                : new V0Mesos(scheduler,
+                              frameworkBuilder.build(),
+                              args[0],
+                              credentialBuilder.build());
+    } else {
+      mesos = version == 1
+                ? new JNIMesos(scheduler, args[0])
+                : new V0Mesos(scheduler, frameworkBuilder.build(), args[0]);
+    }
+
+    lock.lock();
+    try {
+      while (!finished) {
+        finishedCondtion.await();
+      }
+    } finally {
+      lock.unlock();
+    }
+
+    System.exit(0);
+  }
+
+  static boolean finished = false;
+  final static Lock lock = new ReentrantLock();
+  final static Condition finishedCondtion = lock.newCondition();
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/examples/java/v1-test-framework.in
----------------------------------------------------------------------
diff --git a/src/examples/java/v1-test-framework.in 
b/src/examples/java/v1-test-framework.in
new file mode 100644
index 0000000..6e5e581
--- /dev/null
+++ b/src/examples/java/v1-test-framework.in
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+
+# This script uses MESOS_SOURCE_DIR and MESOS_BUILD_DIR which come
+# from configuration substitutions.
+MESOS_SOURCE_DIR=@abs_top_srcdir@
+MESOS_BUILD_DIR=@abs_top_builddir@
+
+# Locate Java from environment or use configure discovered location.
+JAVA_HOME=${JAVA_HOME-@JAVA_HOME@}
+JAVA=${JAVA-${JAVA_HOME}/bin/java}
+
+# Use colors for errors.
+. ${MESOS_SOURCE_DIR}/support/colors.sh
+
+PROTOBUF_JAR=@PROTOBUF_JAR@
+
+test ! -e ${PROTOBUF_JAR} && \
+  echo "${RED}Failed to find ${PROTOBUF_JAR}${NORMAL}" && \
+  exit 1
+
+MESOS_JAR=${MESOS_BUILD_DIR}/src/java/target/mesos-@PACKAGE_VERSION@.jar
+
+test ! -e ${MESOS_JAR} && \
+  echo "${RED}Failed to find ${MESOS_JAR}${NORMAL}" && \
+  exit 1
+
+EXAMPLES_JAR=${MESOS_BUILD_DIR}/src/examples.jar
+
+test ! -e ${EXAMPLES_JAR} && \
+  echo "${RED}Failed to find ${EXAMPLES_JAR}${NORMAL}" && \
+  exit 1
+
+# Need to run in the directory containing this script so that the
+# framework is able to find the executor.
+cd `dirname ${0}`
+
+exec ${JAVA} -cp ${PROTOBUF_JAR}:${MESOS_JAR}:${EXAMPLES_JAR} \
+  -Djava.library.path=${MESOS_BUILD_DIR}/src/.libs \
+  V1TestFramework "${@}"

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/examples_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/examples_tests.cpp b/src/tests/examples_tests.cpp
index cac5304..52fac33 100644
--- a/src/tests/examples_tests.cpp
+++ b/src/tests/examples_tests.cpp
@@ -38,6 +38,10 @@ TEST_SCRIPT(ExamplesTest, DiskFullFramework,
 TEST_SCRIPT(ExamplesTest, JavaFramework, "java_framework_test.sh")
 TEST_SCRIPT(ExamplesTest, JavaException, "java_exception_test.sh")
 TEST_SCRIPT(ExamplesTest, JavaLog, "java_log_test.sh")
+
+// TODO(anand): Parameterize these tests on version.
+TEST_SCRIPT(ExamplesTest, V0JavaFramework, "java_v0_framework_test.sh")
+TEST_SCRIPT(ExamplesTest, V1JavaFramework, "java_v1_framework_test.sh")
 #endif
 
 #ifdef MESOS_HAS_PYTHON

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/java_v0_framework_test.sh
----------------------------------------------------------------------
diff --git a/src/tests/java_v0_framework_test.sh 
b/src/tests/java_v0_framework_test.sh
new file mode 100755
index 0000000..12daae4
--- /dev/null
+++ b/src/tests/java_v0_framework_test.sh
@@ -0,0 +1,41 @@
+#!/usr/bin/env bash
+
+# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment.
+
+env | grep MESOS_SOURCE_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_SOURCE_DIR in environment" && \
+  exit 1
+
+env | grep MESOS_BUILD_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_BUILD_DIR in environment" && \
+  exit 1
+
+source ${MESOS_SOURCE_DIR}/support/atexit.sh
+
+MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX`
+
+atexit "rm -rf ${MESOS_WORK_DIR}"
+export MESOS_WORK_DIR=${MESOS_WORK_DIR}
+
+# Lower the authentication timeout to speed up the test (the master
+# may drop the authentication message while it is recovering).
+export MESOS_AUTHENTICATION_TIMEOUT=200ms
+
+# Set local Mesos runner to use 3 slaves.
+export MESOS_NUM_SLAVES=3
+
+# Set resources for the slave.
+export MESOS_RESOURCES="cpus:2;mem:10240"
+
+# Set isolation for the slave.
+export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem"
+
+# Set launcher for the slave.
+export MESOS_LAUNCHER="posix"
+
+# Check that the Java test framework executes without crashing (returns 0).
+exec $MESOS_BUILD_DIR/src/examples/java/v1-test-framework local 0

http://git-wip-us.apache.org/repos/asf/mesos/blob/74ceb051/src/tests/java_v1_framework_test.sh
----------------------------------------------------------------------
diff --git a/src/tests/java_v1_framework_test.sh 
b/src/tests/java_v1_framework_test.sh
new file mode 100755
index 0000000..c49e11c
--- /dev/null
+++ b/src/tests/java_v1_framework_test.sh
@@ -0,0 +1,40 @@
+#!/usr/bin/env bash
+
+# Expecting MESOS_SOURCE_DIR and MESOS_BUILD_DIR to be in environment.
+
+env | grep MESOS_SOURCE_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_SOURCE_DIR in environment" && \
+  exit 1
+
+env | grep MESOS_BUILD_DIR >/dev/null
+
+test $? != 0 && \
+  echo "Failed to find MESOS_BUILD_DIR in environment" && \
+  exit 1
+
+source ${MESOS_SOURCE_DIR}/support/atexit.sh
+
+MESOS_WORK_DIR=`mktemp -d -t mesos-XXXXXX`
+
+atexit "rm -rf ${MESOS_WORK_DIR}"
+export MESOS_WORK_DIR=${MESOS_WORK_DIR}
+
+# Set the connection delay to 0 to speed up the tests.
+export MESOS_CONNECTION_DELAY_MAX=0ms;
+
+# Set local Mesos runner to use 3 slaves.
+export MESOS_NUM_SLAVES=3
+
+# Set resources for the slave.
+export MESOS_RESOURCES="cpus:2;mem:10240"
+
+# Set isolation for the slave.
+export MESOS_ISOLATION="filesystem/posix,posix/cpu,posix/mem"
+
+# Set launcher for the slave.
+export MESOS_LAUNCHER="posix"
+
+# Check that the Java test framework executes without crashing (returns 0).
+exec $MESOS_BUILD_DIR/src/examples/java/v1-test-framework local 1

Reply via email to