Repository: aurora
Updated Branches:
  refs/heads/master 2652fe02a -> 705dbc7cd


http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java 
b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
index c599fe3..b132cde 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java
@@ -14,33 +14,14 @@
 package org.apache.aurora.scheduler.mesos;
 
 import java.nio.charset.StandardCharsets;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.net.InetAddresses;
 
-import org.apache.aurora.common.application.Lifecycle;
-import org.apache.aurora.common.base.Command;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.gen.HostAttributes;
-import org.apache.aurora.scheduler.HostOffer;
-import org.apache.aurora.scheduler.TaskStatusHandler;
-import org.apache.aurora.scheduler.base.Conversions;
-import org.apache.aurora.scheduler.base.SchedulerException;
-import org.apache.aurora.scheduler.events.EventSink;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverDisconnected;
-import org.apache.aurora.scheduler.events.PubsubEvent.DriverRegistered;
-import org.apache.aurora.scheduler.events.PubsubEvent.TaskStatusReceived;
-import org.apache.aurora.scheduler.offers.OfferManager;
-import org.apache.aurora.scheduler.stats.CachedCounters;
-import org.apache.aurora.scheduler.storage.Storage.StorageException;
-import org.apache.aurora.scheduler.storage.entities.IHostAttributes;
-import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
-import org.apache.aurora.scheduler.testing.FakeStatsProvider;
 import org.apache.mesos.Protos;
 import org.apache.mesos.SchedulerDriver;
+import org.apache.mesos.v1.Protos.ExecutorID;
 import org.apache.mesos.v1.Protos.FrameworkID;
 import org.apache.mesos.v1.Protos.OfferID;
 import org.apache.mesos.v1.Protos.TaskID;
@@ -48,64 +29,36 @@ import org.apache.mesos.v1.Protos.TaskState;
 import org.apache.mesos.v1.Protos.TaskStatus;
 import org.apache.mesos.v1.Protos.TaskStatus.Reason;
 import org.apache.mesos.v1.Protos.TaskStatus.Source;
-import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import static org.apache.aurora.gen.MaintenanceMode.DRAINING;
-import static org.apache.aurora.gen.MaintenanceMode.NONE;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.expect;
+import static org.apache.aurora.scheduler.mesos.ProtosConversion.convert;
 import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class MesosSchedulerImplTest extends EasyMockTest {
 
   private static final String FRAMEWORK_ID = "framework-id";
   private static final FrameworkID FRAMEWORK =
       FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
-
+  private static final String MASTER_ID = "master-id";
+  private static final Protos.MasterInfo MASTER = 
Protos.MasterInfo.newBuilder()
+      .setId(MASTER_ID)
+      
.setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) 
//NOPMD
+      .setPort(5050).build();
   private static final String SLAVE_HOST = "slave-hostname";
   private static final Protos.SlaveID SLAVE_ID =
       Protos.SlaveID.newBuilder().setValue("slave-id").build();
-  private static final String SLAVE_HOST_2 = "slave-hostname-2";
-  private static final Protos.SlaveID SLAVE_ID_2 =
-      Protos.SlaveID.newBuilder().setValue("slave-id-2").build();
-  private static final Protos.ExecutorID EXECUTOR_ID =
-      Protos.ExecutorID.newBuilder().setValue("executor-id").build();
 
   private static final OfferID OFFER_ID = 
OfferID.newBuilder().setValue("offer-id").build();
   private static final Protos.Offer OFFER = Protos.Offer.newBuilder()
-          .setFrameworkId(ProtosConversion.convert(FRAMEWORK))
+          .setFrameworkId(convert(FRAMEWORK))
           .setSlaveId(SLAVE_ID)
           .setHostname(SLAVE_HOST)
-          .setId(ProtosConversion.convert(OFFER_ID))
-          .build();
-  private static final HostOffer HOST_OFFER = new HostOffer(
-      ProtosConversion.convert(OFFER),
-      IHostAttributes.build(
-          new HostAttributes()
-              .setHost(SLAVE_HOST)
-              .setSlaveId(SLAVE_ID.getValue())
-              .setMode(NONE)
-              .setAttributes(ImmutableSet.of())));
-  private static final OfferID OFFER_ID_2 = 
OfferID.newBuilder().setValue("offer-id-2").build();
-  private static final Protos.Offer OFFER_2 = Protos.Offer.newBuilder(OFFER)
-          .setSlaveId(SLAVE_ID_2)
-          .setHostname(SLAVE_HOST_2)
-          .setId(ProtosConversion.convert(OFFER_ID_2))
+          .setId(convert(OFFER_ID))
           .build();
-  private static final HostOffer HOST_OFFER_2 = new HostOffer(
-      ProtosConversion.convert(OFFER_2),
-      IHostAttributes.build(
-          new HostAttributes()
-              .setHost(SLAVE_HOST_2)
-              .setSlaveId(SLAVE_ID_2.getValue())
-              .setMode(NONE)
-              .setAttributes(ImmutableSet.of())));
+
+  private static final ExecutorID EXECUTOR_ID =
+      ExecutorID.newBuilder().setValue("executor-id").build();
 
   private static final TaskStatus STATUS_NO_REASON = TaskStatus.newBuilder()
       .setState(TaskState.TASK_RUNNING)
@@ -121,358 +74,129 @@ public class MesosSchedulerImplTest extends EasyMockTest 
{
       .setReason(Reason.REASON_COMMAND_EXECUTOR_FAILED)
       .build();
 
-  private static final TaskStatus STATUS_RECONCILIATION = STATUS_NO_REASON
-      .toBuilder()
-      .setReason(Reason.REASON_RECONCILIATION)
-      .build();
-
-  private static final TaskStatusReceived PUBSUB_RECONCILIATION_EVENT = new 
TaskStatusReceived(
-      STATUS_RECONCILIATION.getState(),
-      Optional.of(STATUS_RECONCILIATION.getSource()),
-      Optional.of(STATUS_RECONCILIATION.getReason()),
-      Optional.of(1000000L)
-  );
-
-  private StorageTestUtil storageUtil;
-  private Command shutdownCommand;
-  private TaskStatusHandler statusHandler;
-  private OfferManager offerManager;
+  private MesosCallbackHandler handler;
   private SchedulerDriver driver;
-  private EventSink eventSink;
-  private FakeStatsProvider statsProvider;
 
   private MesosSchedulerImpl scheduler;
 
   @Before
   public void setUp() {
-    Logger log = LoggerFactory.getLogger("");
-    initializeScheduler(log);
-  }
-
-  private void initializeScheduler(Logger logger) {
-    storageUtil = new StorageTestUtil(this);
-    shutdownCommand = createMock(Command.class);
-    statusHandler = createMock(TaskStatusHandler.class);
-    offerManager = createMock(OfferManager.class);
-    eventSink = createMock(EventSink.class);
-    statsProvider = new FakeStatsProvider();
-
-    scheduler = new MesosSchedulerImpl(
-        storageUtil.storage,
-        new Lifecycle(shutdownCommand),
-        statusHandler,
-        offerManager,
-        eventSink,
-        MoreExecutors.sameThreadExecutor(),
-        new CachedCounters(statsProvider),
-        logger,
-        statsProvider);
+    handler = createMock(MesosCallbackHandler.class);
     driver = createMock(SchedulerDriver.class);
+    scheduler = new MesosSchedulerImpl(handler);
   }
 
-  @Test(expected = IllegalStateException.class)
-  public void testBadOrdering() {
-    control.replay();
+  @Test
+  public void testSlaveLost() {
+    handler.handleLostAgent(convert(SLAVE_ID));
+    expectLastCall().once();
 
-    // Should fail since the scheduler is not yet registered.
-    scheduler.resourceOffers(driver, ImmutableList.of());
-  }
+    control.replay();
 
-  @Test
-  public void testNoOffers() {
-    new AbstractRegisteredTest() {
-      @Override
-      void test() {
-        scheduler.resourceOffers(driver, ImmutableList.of());
-      }
-    }.run();
+    scheduler.slaveLost(driver, SLAVE_ID);
   }
 
   @Test
-  public void testAcceptOffer() {
-    new AbstractOfferTest() {
-      @Override
-      void respondToOffer() {
-        expectOfferAttributesSaved(HOST_OFFER);
-        offerManager.addOffer(HOST_OFFER);
-      }
-    }.run();
-  }
+  public void testRegistered() {
+    handler.handleRegistration(FRAMEWORK, convert(MASTER));
+    expectLastCall().once();
 
-  @Test
-  public void testAcceptOfferDebugLogging() {
-    Logger mockLogger = createMock(Logger.class);
-    mockLogger.info(anyString());
-    mockLogger.debug(anyString(), EasyMock.<Object>anyObject());
-    initializeScheduler(mockLogger);
-
-    new AbstractOfferTest() {
-      @Override
-      void respondToOffer() {
-        expectOfferAttributesSaved(HOST_OFFER);
-        offerManager.addOffer(HOST_OFFER);
-      }
-    }.run();
-  }
+    control.replay();
 
-  @Test
-  public void testAttributesModePreserved() {
-    new AbstractOfferTest() {
-      @Override
-      void respondToOffer() {
-        IHostAttributes draining =
-            
IHostAttributes.build(HOST_OFFER.getAttributes().newBuilder().setMode(DRAINING));
-        
expect(storageUtil.attributeStore.getHostAttributes(HOST_OFFER.getOffer().getHostname()))
-            .andReturn(Optional.of(draining));
-        IHostAttributes saved = IHostAttributes.build(
-            
Conversions.getAttributes(HOST_OFFER.getOffer()).newBuilder().setMode(DRAINING));
-        
expect(storageUtil.attributeStore.saveHostAttributes(saved)).andReturn(true);
-
-        HostOffer offer = new HostOffer(HOST_OFFER.getOffer(), draining);
-        offerManager.addOffer(offer);
-      }
-    }.run();
+    scheduler.registered(driver, convert(FRAMEWORK), MASTER);
   }
 
   @Test
-  public void testStatusUpdate() {
-    // Test multiple variations of fields in TaskStatus to cover all branches.
-    new StatusUpdater(STATUS).run();
-    control.verify();
-    control.reset();
-    new StatusUpdater(STATUS.toBuilder().clearSource().build()).run();
-    control.verify();
-    control.reset();
-    new StatusUpdater(STATUS.toBuilder().clearReason().build()).run();
-    control.verify();
-    control.reset();
-    new StatusUpdater(STATUS.toBuilder().clearMessage().build()).run();
-  }
+  public void testDisconnected() {
+    handler.handleDisconnection();
+    expectLastCall().once();
 
-  @Test(expected = SchedulerException.class)
-  public void testStatusUpdateFails() {
-    new AbstractStatusTest() {
-      @Override
-      void expectations() {
-        eventSink.post(new TaskStatusReceived(
-            STATUS.getState(),
-            Optional.of(STATUS.getSource()),
-            Optional.of(STATUS.getReason()),
-            Optional.of(1000000L)
-        ));
-        statusHandler.statusUpdate(status);
-        expectLastCall().andThrow(new StorageException("Injected."));
-      }
-    }.run();
-  }
+    control.replay();
 
-  @Test
-  public void testMultipleOffers() {
-    new AbstractRegisteredTest() {
-      @Override
-      void expectations() {
-        expectOfferAttributesSaved(HOST_OFFER);
-        expectOfferAttributesSaved(HOST_OFFER_2);
-        offerManager.addOffer(HOST_OFFER);
-        offerManager.addOffer(HOST_OFFER_2);
-      }
-
-      @Override
-      void test() {
-        scheduler.resourceOffers(driver,
-            ImmutableList.of(
-                ProtosConversion.convert(HOST_OFFER.getOffer()),
-                ProtosConversion.convert(HOST_OFFER_2.getOffer())));
-      }
-    }.run();
+    scheduler.disconnected(driver);
   }
 
   @Test
-  public void testDisconnected() {
-    new AbstractRegisteredTest() {
-      @Override
-      void expectations() {
-        eventSink.post(new DriverDisconnected());
-      }
-
-      @Override
-      void test() {
-        scheduler.disconnected(driver);
-        assertEquals(1L, 
statsProvider.getLongValue("scheduler_framework_disconnects"));
-      }
-    }.run();
-  }
+  public void testReRegistered() {
+    handler.handleReregistration(convert(MASTER));
+    expectLastCall().once();
 
-  @Test
-  public void testFrameworkMessageIgnored() {
     control.replay();
 
-    scheduler.frameworkMessage(
-        driver,
-        EXECUTOR_ID,
-        SLAVE_ID,
-        "hello".getBytes(StandardCharsets.UTF_8));
+    scheduler.reregistered(driver, MASTER);
   }
 
   @Test
-  public void testSlaveLost() {
+  public void testResourceOffers() {
+    handler.handleRegistration(FRAMEWORK, convert(MASTER));
+    expectLastCall().once();
+    handler.handleOffers(ImmutableList.of(convert(OFFER)));
+    expectLastCall().once();
+
     control.replay();
 
-    scheduler.slaveLost(driver, SLAVE_ID);
-    assertEquals(1L, statsProvider.getLongValue("slaves_lost"));
+    scheduler.registered(driver, convert(FRAMEWORK), MASTER);
+    scheduler.resourceOffers(driver, ImmutableList.of(OFFER));
   }
 
-  @Test
-  public void testReregistered() {
+  @Test(expected = IllegalStateException.class)
+  public void testBadOrdering() {
     control.replay();
 
-    scheduler.reregistered(driver, Protos.MasterInfo.getDefaultInstance());
+    // Should fail since the scheduler is not yet registered.
+    scheduler.resourceOffers(driver, ImmutableList.of());
   }
 
   @Test
   public void testOfferRescinded() {
-    offerManager.cancelOffer(OFFER_ID);
+    handler.handleRescind(OFFER_ID);
+    expectLastCall().once();
 
     control.replay();
 
-    scheduler.offerRescinded(driver, ProtosConversion.convert(OFFER_ID));
-    assertEquals(1L, statsProvider.getLongValue("offers_rescinded"));
+    scheduler.offerRescinded(driver, convert(OFFER_ID));
   }
 
   @Test
-  public void testError() {
-    shutdownCommand.execute();
+  public void testStatusUpdate() {
+    handler.handleUpdate(STATUS);
+    expectLastCall().once();
 
     control.replay();
 
-    scheduler.error(driver, "error");
+    scheduler.statusUpdate(driver, convert(STATUS));
   }
 
   @Test
-  public void testExecutorLost() {
+  public void testError() {
+    handler.handleError("Oh No!");
+    expectLastCall().once();
+
     control.replay();
 
-    scheduler.executorLost(driver, EXECUTOR_ID, SLAVE_ID, 1);
+    scheduler.error(driver, "Oh No!");
   }
 
   @Test
-  public void testStatusReconciliationAcceptsDebugLogging() {
-    Logger mockLogger = createMock(Logger.class);
-    mockLogger.info(anyString());
-    mockLogger.debug(anyString());
-    initializeScheduler(mockLogger);
-
-    new AbstractStatusReconciliationTest() {
-      @Override
-      void expectations() {
-        eventSink.post(PUBSUB_RECONCILIATION_EVENT);
-        statusHandler.statusUpdate(status);
-      }
-    }.run();
-  }
-
-  private class StatusUpdater extends AbstractStatusTest {
-    StatusUpdater(TaskStatus status) {
-      super(status);
-    }
-
-    @Override
-    void expectations() {
-      eventSink.post(new TaskStatusReceived(
-          status.getState(),
-          Optional.fromNullable(status.getSource()),
-          status.hasReason() ? Optional.of(status.getReason()) : 
Optional.absent(),
-          Optional.of(1000000L)
-      ));
-      statusHandler.statusUpdate(status);
-    }
-  }
-
-  private void expectOfferAttributesSaved(HostOffer offer) {
-    
expect(storageUtil.attributeStore.getHostAttributes(offer.getOffer().getHostname()))
-        .andReturn(Optional.absent());
-    IHostAttributes defaultMode = IHostAttributes.build(
-        
Conversions.getAttributes(offer.getOffer()).newBuilder().setMode(NONE));
-    
expect(storageUtil.attributeStore.saveHostAttributes(defaultMode)).andReturn(true);
-  }
-
-  private abstract class AbstractRegisteredTest {
-    private final AtomicBoolean runCalled = new AtomicBoolean(false);
-
-    AbstractRegisteredTest() {
-      // Prevent otherwise silent noop tests that forget to call run().
-      addTearDown(new TearDown() {
-        @Override
-        public void tearDown() {
-          assertTrue(runCalled.get());
-        }
-      });
-    }
-
-    void run() {
-      runCalled.set(true);
-      eventSink.post(new DriverRegistered());
-      storageUtil.expectOperations();
-      storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
-      expectations();
-
-      control.replay();
-
-      scheduler.registered(
-          driver,
-          ProtosConversion.convert(FRAMEWORK),
-          Protos.MasterInfo.getDefaultInstance());
-      test();
-    }
-
-    void expectations() {
-      // Default no-op, subclasses may override.
-    }
-
-    abstract void test();
-  }
+  public void testFrameworkMessage() {
+    handler.handleMessage(EXECUTOR_ID, convert(SLAVE_ID));
+    expectLastCall().once();
 
-  private abstract class AbstractOfferTest extends AbstractRegisteredTest {
-    AbstractOfferTest() {
-      super();
-    }
-
-    abstract void respondToOffer();
-
-    @Override
-    void expectations() {
-      respondToOffer();
-    }
+    control.replay();
 
-    @Override
-    void test() {
-      scheduler.resourceOffers(
-          driver,
-          ImmutableList.of(ProtosConversion.convert(HOST_OFFER.getOffer())));
-    }
+    scheduler.frameworkMessage(
+        driver,
+        convert(EXECUTOR_ID),
+        SLAVE_ID,
+        "message".getBytes(StandardCharsets.UTF_8));
   }
 
-  private abstract class AbstractStatusTest extends AbstractRegisteredTest {
-    protected final TaskStatus status;
-
-    AbstractStatusTest() {
-      this(STATUS);
-    }
-
-    AbstractStatusTest(TaskStatus status) {
-      super();
-      this.status = status;
-    }
+  @Test
+  public void testExecutorLost() {
+    handler.handleLostExecutor(EXECUTOR_ID, convert(SLAVE_ID), 1);
 
-    @Override
-    void test() {
-      scheduler.statusUpdate(driver, ProtosConversion.convert(status));
-    }
-  }
+    control.replay();
 
-  private abstract class AbstractStatusReconciliationTest extends 
AbstractStatusTest {
-    AbstractStatusReconciliationTest() {
-      super(STATUS_RECONCILIATION);
-    }
+    scheduler.executorLost(driver, convert(EXECUTOR_ID), SLAVE_ID, 1);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
 
b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
new file mode 100644
index 0000000..988ec50
--- /dev/null
+++ 
b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedMesosSchedulerImplTest.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.net.InetAddresses;
+import com.google.protobuf.ByteString;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.stats.CachedCounters;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.aurora.scheduler.testing.FakeStatsProvider;
+import org.apache.mesos.v1.Protos.AgentID;
+import org.apache.mesos.v1.Protos.ExecutorID;
+import org.apache.mesos.v1.Protos.FrameworkID;
+import org.apache.mesos.v1.Protos.FrameworkInfo;
+import org.apache.mesos.v1.Protos.MasterInfo;
+import org.apache.mesos.v1.Protos.Offer;
+import org.apache.mesos.v1.Protos.OfferID;
+import org.apache.mesos.v1.Protos.TaskID;
+import org.apache.mesos.v1.Protos.TaskState;
+import org.apache.mesos.v1.Protos.TaskStatus;
+import org.apache.mesos.v1.Protos.TaskStatus.Source;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Protos.Event;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class VersionedMesosSchedulerImplTest extends EasyMockTest {
+
+  private MesosCallbackHandler handler;
+  private StorageTestUtil storageUtil;
+  private Mesos driver;
+  private FakeStatsProvider statsProvider;
+  private DriverSettings driverSettings;
+
+  private VersionedMesosSchedulerImpl scheduler;
+
+  private static final String AGENT_HOST = "slave-hostname";
+  private static final AgentID AGENT_ID =
+      AgentID.newBuilder().setValue("slave-id").build();
+
+  private static final String FRAMEWORK_ID = "framework-id";
+  private static final FrameworkID FRAMEWORK =
+      FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build();
+  private static final FrameworkInfo FRAMEWORK_INFO = 
FrameworkInfo.newBuilder()
+      .setName("name")
+      .setUser("user")
+      .setCheckpoint(true)
+      .setFailoverTimeout(1000)
+      .build();
+
+  private static final String MASTER_ID = "master-id";
+  private static final MasterInfo MASTER = MasterInfo.newBuilder()
+      .setId(MASTER_ID)
+      
.setIp(InetAddresses.coerceToInteger(InetAddresses.forString("1.2.3.4"))) 
//NOPMD
+      .setPort(5050).build();
+
+  private static final OfferID OFFER_ID = 
OfferID.newBuilder().setValue("offer-id").build();
+  private static final Offer OFFER = Offer.newBuilder()
+      .setFrameworkId(FRAMEWORK)
+      .setAgentId(AGENT_ID)
+      .setHostname(AGENT_HOST)
+      .setId(OFFER_ID)
+      .build();
+
+  private static final TaskStatus STATUS = TaskStatus.newBuilder()
+      .setState(TaskState.TASK_RUNNING)
+      .setSource(Source.SOURCE_AGENT)
+      .setMessage("message")
+      .setTimestamp(1D)
+      .setTaskId(TaskID.newBuilder().setValue("task-id").build())
+      .build();
+
+  private static final ExecutorID EXECUTOR_ID =
+      ExecutorID.newBuilder().setValue("executor-id").build();
+
+  private static final Event OFFER_EVENT = Event.newBuilder()
+      .setType(Event.Type.OFFERS)
+      .setOffers(Event.Offers.newBuilder()
+          .addOffers(OFFER))
+      .build();
+
+  private static final Event SUBSCRIBED_EVENT = Event.newBuilder()
+      .setType(Event.Type.SUBSCRIBED)
+      .setSubscribed(Event.Subscribed.newBuilder()
+          .setFrameworkId(FRAMEWORK)
+          .setHeartbeatIntervalSeconds(15)
+          .setMasterInfo(MASTER))
+      .build();
+
+  private static final Event UPDATE_EVENT = Event.newBuilder()
+      .setType(Event.Type.UPDATE)
+      .setUpdate(Event.Update.newBuilder()
+          .setStatus(STATUS))
+      .build();
+
+  private static final Event RESCIND_EVENT = Event.newBuilder()
+      .setType(Event.Type.RESCIND)
+      .setRescind(Event.Rescind.newBuilder().setOfferId(OFFER_ID))
+      .build();
+
+  private static final Event MESSAGE_EVENT = Event.newBuilder()
+      .setType(Event.Type.MESSAGE)
+      .setMessage(Event.Message.newBuilder()
+          .setAgentId(AGENT_ID)
+          .setExecutorId(EXECUTOR_ID)
+          .setData(ByteString.copyFromUtf8("message")))
+      .build();
+
+  private static final Event ERROR_EVENT = Event.newBuilder()
+      .setType(Event.Type.ERROR)
+      .setError(Event.Error.newBuilder().setMessage("Oh no!"))
+      .build();
+
+  private static final Event FAILED_AGENT_EVENT = Event.newBuilder()
+      .setType(Event.Type.FAILURE)
+      .setFailure(Event.Failure.newBuilder().setAgentId(AGENT_ID))
+      .build();
+
+  @Before
+  public void setUp() {
+    handler = createMock(MesosCallbackHandler.class);
+    storageUtil = new StorageTestUtil(this);
+    driver = createMock(Mesos.class);
+    statsProvider = new FakeStatsProvider();
+    driverSettings = createMock(DriverSettings.class);
+
+    scheduler = new VersionedMesosSchedulerImpl(
+        handler,
+        new CachedCounters(statsProvider),
+        storageUtil.storage,
+        driverSettings);
+  }
+
+  @Test
+  public void testConnected() {
+    // Once the V1 driver has connected, we need to establish a subscription 
to get events
+
+    storageUtil.expectOperations();
+    
expect(storageUtil.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+    expect(driverSettings.getFrameworkInfo()).andReturn(FRAMEWORK_INFO);
+
+    Capture<Call> subscribeCapture = createCapture();
+
+    driver.send(capture(subscribeCapture));
+    expectLastCall().once();
+
+    control.replay();
+
+    scheduler.connected(driver);
+
+    assertTrue(subscribeCapture.hasCaptured());
+
+    Call subscribe = subscribeCapture.getValue();
+
+    assertEquals(subscribe.getType(), Call.Type.SUBSCRIBE);
+    assertEquals(subscribe.getFrameworkId(), FRAMEWORK);
+    assertEquals(
+        subscribe.getSubscribe().getFrameworkInfo(),
+        FRAMEWORK_INFO.toBuilder().setId(FRAMEWORK).build());
+  }
+
+  @Test
+  public void testDisconnected() {
+    handler.handleDisconnection();
+
+    control.replay();
+
+    scheduler.disconnected(driver);
+  }
+
+  @Test
+  public void testSubscription() {
+    handler.handleRegistration(FRAMEWORK, MASTER);
+
+    control.replay();
+
+    scheduler.received(driver, SUBSCRIBED_EVENT);
+
+    assertEquals(1L, 
statsProvider.getLongValue("mesos_scheduler_event_SUBSCRIBED"));
+  }
+
+  @Test
+  public void testOffers() {
+    handler.handleRegistration(FRAMEWORK, MASTER);
+    handler.handleOffers(ImmutableList.of(OFFER));
+
+    control.replay();
+
+    scheduler.received(driver, SUBSCRIBED_EVENT);
+    scheduler.received(driver, OFFER_EVENT);
+    assertEquals(1L, 
statsProvider.getLongValue("mesos_scheduler_event_OFFERS"));
+  }
+
+  @Test
+  public void testRescind() {
+    handler.handleRescind(OFFER_ID);
+
+    control.replay();
+
+    scheduler.received(driver, RESCIND_EVENT);
+    assertEquals(1L, 
statsProvider.getLongValue("mesos_scheduler_event_RESCIND"));
+  }
+
+  @Test
+  public void testUpdate() {
+    handler.handleUpdate(STATUS);
+
+    control.replay();
+
+    scheduler.received(driver, UPDATE_EVENT);
+
+    assertEquals(1L, 
statsProvider.getLongValue("mesos_scheduler_event_UPDATE"));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testBadOrdering() {
+    // get an offer before the driver has registered
+
+    control.replay();
+
+    scheduler.received(driver, OFFER_EVENT);
+  }
+
+  @Test
+  public void testMessage() {
+    handler.handleMessage(EXECUTOR_ID, AGENT_ID);
+
+    control.replay();
+
+    scheduler.received(driver, MESSAGE_EVENT);
+    assertEquals(1L, 
statsProvider.getLongValue("mesos_scheduler_event_MESSAGE"));
+  }
+
+  @Test
+  public void testError() {
+    handler.handleError("Oh no!");
+
+    control.replay();
+
+    scheduler.received(driver, ERROR_EVENT);
+    assertEquals(1L, 
statsProvider.getLongValue("mesos_scheduler_event_ERROR"));
+  }
+
+  @Test
+  public void testFailedAgent() {
+    handler.handleLostAgent(AGENT_ID);
+
+    control.replay();
+
+    scheduler.received(driver, FAILED_AGENT_EVENT);
+    assertEquals(1L, 
statsProvider.getLongValue("mesos_scheduler_event_FAILURE"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
 
b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
new file mode 100644
index 0000000..72aede8
--- /dev/null
+++ 
b/src/test/java/org/apache/aurora/scheduler/mesos/VersionedSchedulerDriverServiceTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.mesos;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
+import org.apache.mesos.v1.Protos;
+import org.apache.mesos.v1.scheduler.Mesos;
+import org.apache.mesos.v1.scheduler.Protos.Call;
+import org.apache.mesos.v1.scheduler.Scheduler;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class VersionedSchedulerDriverServiceTest extends EasyMockTest {
+
+  private static final String FRAMEWORK_ID = "framework_id";
+  private static final Protos.OfferID OFFER_ID =
+      Protos.OfferID.newBuilder().setValue("offer-id").build();
+  private static final Protos.Filters FILTER =
+      Protos.Filters.newBuilder().setRefuseSeconds(10).build();
+  private static final DriverSettings SETTINGS = new DriverSettings(
+      "fakemaster",
+      Optional.absent(),
+      Protos.FrameworkInfo.newBuilder()
+          .setUser("framework user")
+          .setName("test framework")
+          .build());
+
+  private StorageTestUtil storage;
+  private Scheduler scheduler;
+  private VersionedSchedulerDriverService driverService;
+  private VersionedDriverFactory driverFactory;
+  private Mesos mesos;
+
+  @Before
+  public void setUp() {
+    scheduler = createMock(Scheduler.class);
+    storage = new StorageTestUtil(this);
+    driverFactory = createMock(VersionedDriverFactory.class);
+    mesos = createMock(Mesos.class);
+    driverService = new VersionedSchedulerDriverService(
+        storage.storage,
+        SETTINGS,
+        scheduler,
+        driverFactory);
+  }
+
+  @Test
+  public void testNoopStop() {
+    control.replay();
+
+    driverService.stopAsync().awaitTerminated();
+  }
+
+  @Test
+  public void testStop() {
+    expectStart();
+    control.replay();
+
+    driverService.startAsync().awaitRunning();
+    driverService.stopAsync().awaitTerminated();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testExceptionBeforeStart() {
+    control.replay();
+    driverService.killTask("task-id");
+  }
+
+  @Test
+  public void testBlockingBeforeRegistered() throws InterruptedException {
+    expectStart();
+    control.replay();
+    driverService.startAsync().awaitRunning();
+
+    Thread killRunner = new Thread(() -> {
+      driverService.killTask("task-id");
+    });
+
+    killRunner.start();
+
+    // A hack to ensure the thread actually executes the method
+    Thread.sleep(1000L);
+    assertEquals(Thread.State.WAITING, killRunner.getState());
+
+    killRunner.interrupt();
+  }
+
+  @Test
+  public void testKill() {
+    expectStart();
+    
expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+    Capture<Call> killCapture = createCapture();
+    mesos.send(capture(killCapture));
+    expectLastCall().once();
+
+    control.replay();
+    driverService.startAsync().awaitRunning();
+    driverService.registered(new PubsubEvent.DriverRegistered());
+
+    driverService.killTask("task-id");
+
+    assertTrue(killCapture.hasCaptured());
+    Call kill = killCapture.getValue();
+    assertEquals(kill.getFrameworkId().getValue(), FRAMEWORK_ID);
+    assertEquals(kill.getType(), Call.Type.KILL);
+    assertEquals(kill.getKill().getTaskId().getValue(), "task-id");
+  }
+
+  @Test
+  public void testDecline() {
+    expectStart();
+    
expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+    Capture<Call> declineCapture = createCapture();
+    mesos.send(capture(declineCapture));
+    expectLastCall().once();
+
+    control.replay();
+    driverService.startAsync().awaitRunning();
+    driverService.registered(new PubsubEvent.DriverRegistered());
+
+    driverService.declineOffer(OFFER_ID, FILTER);
+
+    assertTrue(declineCapture.hasCaptured());
+    Call decline = declineCapture.getValue();
+    assertEquals(decline.getFrameworkId().getValue(), FRAMEWORK_ID);
+    assertEquals(decline.getType(), Call.Type.DECLINE);
+    assertEquals(decline.getDecline().getOfferIdsList(), 
ImmutableList.of(OFFER_ID));
+    assertEquals(decline.getDecline().getFilters(), FILTER);
+  }
+
+  @Test
+  public void testAccept() {
+    expectStart();
+    
expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+    Capture<Call> acceptCapture = createCapture();
+    mesos.send(capture(acceptCapture));
+    expectLastCall().once();
+
+    control.replay();
+    driverService.startAsync().awaitRunning();
+    driverService.registered(new PubsubEvent.DriverRegistered());
+
+    driverService.acceptOffers(OFFER_ID, ImmutableList.of(), FILTER);
+
+    assertTrue(acceptCapture.hasCaptured());
+    Call accept = acceptCapture.getValue();
+    assertEquals(accept.getFrameworkId().getValue(), FRAMEWORK_ID);
+    assertEquals(accept.getType(), Call.Type.ACCEPT);
+    assertEquals(accept.getAccept().getFilters(), FILTER);
+    assertEquals(accept.getAccept().getOfferIdsList(), 
ImmutableList.of(OFFER_ID));
+    assertEquals(accept.getAccept().getOperationsList(), ImmutableList.of());
+  }
+
+  private void expectStart() {
+    storage.expectOperations();
+    
expect(storage.schedulerStore.fetchFrameworkId()).andReturn(Optional.of(FRAMEWORK_ID));
+
+    Protos.FrameworkInfo.Builder builder = 
SETTINGS.getFrameworkInfo().toBuilder();
+    builder.setId(Protos.FrameworkID.newBuilder().setValue(FRAMEWORK_ID));
+
+    expect(driverFactory.create(
+        scheduler,
+        builder.build(),
+        SETTINGS.getMasterUri(),
+        SETTINGS.getCredentials()
+    )).andReturn(mesos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/705dbc7c/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java 
b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index f2275c7..1421821 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -67,6 +67,7 @@ import org.apache.shiro.subject.Subject;
 import org.junit.Test;
 
 import static org.apache.aurora.gen.ResponseCode.OK;
+import static 
org.apache.aurora.scheduler.app.SchedulerMain.DriverKind.SCHEDULER_DRIVER;
 import static org.junit.Assert.assertEquals;
 
 public class ThriftIT extends EasyMockTest {
@@ -97,7 +98,7 @@ public class ThriftIT extends EasyMockTest {
             install(new TierModule(TaskTestUtil.TIER_CONFIG));
             
bind(ExecutorSettings.class).toInstance(TestExecutorSettings.THERMOS_EXECUTOR);
 
-            install(new AppModule(configurationManagerSettings));
+            install(new AppModule(configurationManagerSettings, 
SCHEDULER_DRIVER));
 
             bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
 

Reply via email to