Repository: aurora
Updated Branches:
  refs/heads/master e91013fde -> d9c911a62


Add an executor service decorator that gates async operations.

Bugs closed: AURORA-1395

Reviewed at https://reviews.apache.org/r/36710/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/d9c911a6
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/d9c911a6
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/d9c911a6

Branch: refs/heads/master
Commit: d9c911a62bca08fc79043ed09d0bf036f3ca9eb5
Parents: e91013f
Author: Bill Farner <[email protected]>
Authored: Mon Jul 27 12:12:04 2015 -0700
Committer: Bill Farner <[email protected]>
Committed: Mon Jul 27 12:12:04 2015 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/async/DelayExecutor.java   | 33 +++++++
 .../scheduler/async/FlushableWorkQueue.java     | 25 ++++++
 .../scheduler/async/GatedDelayExecutor.java     | 63 ++++++++++++++
 .../scheduler/events/PubsubEventModule.java     |  1 +
 .../scheduler/async/GatedDelayExecutorTest.java | 91 ++++++++++++++++++++
 5 files changed, 213 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/d9c911a6/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java 
b/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java
new file mode 100644
index 0000000..940b15d
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/DelayExecutor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.Executor;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+/**
+ * An executor that supports executing work after a minimum time delay.
+ */
+public interface DelayExecutor extends Executor {
+
+  /**
+   * Executes {@code work} after no less than {@code minDelay}.
+   *
+   * @param work Work to execute.
+   * @param minDelay Minimum amount of time to wait before executing the work.
+   */
+  void execute(Runnable work, Amount<Long, Time> minDelay);
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/d9c911a6/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java 
b/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java
new file mode 100644
index 0000000..11a1c2a
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+/**
+ * A work queue that only executes pending work when flushed.
+ */
+public interface FlushableWorkQueue {
+
+  /**
+   * Makes pending work available for execution.
+   */
+  void flush();
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/d9c911a6/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java 
b/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
new file mode 100644
index 0000000..1893a9b
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Queue;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * An executor that queues work until flushed.
+ */
+class GatedDelayExecutor implements DelayExecutor, FlushableWorkQueue {
+
+  private final ScheduledExecutorService executor;
+  private final Queue<Runnable> queue = Lists.newLinkedList();
+
+  /**
+   * Creates a gated delay executor that will flush work to the provided 
{@code delegate}.
+   *
+   * @param delegate Delegate to execute work with when flushed.
+   */
+  GatedDelayExecutor(ScheduledExecutorService delegate) {
+    this.executor = requireNonNull(delegate);
+  }
+
+  private synchronized void enqueue(Runnable work) {
+    queue.add(work);
+  }
+
+  @Override
+  public synchronized void flush() {
+    for (Runnable work : Iterables.consumingIterable(queue)) {
+      work.run();
+    }
+  }
+
+  @Override
+  public synchronized void execute(Runnable command) {
+    enqueue(() -> executor.execute(command));
+  }
+
+  @Override
+  public synchronized void execute(Runnable work, Amount<Long, Time> minDelay) 
{
+    enqueue(() -> executor.schedule(work, minDelay.getValue(), 
minDelay.getUnit().getTimeUnit()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/d9c911a6/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java 
b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
index c85979d..ccecfdb 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -75,6 +75,7 @@ public final class PubsubEventModule extends AbstractModule {
     this.async = requireNonNull(async);
   }
 
+  // TODO(wfarner): Remove the async argument and accept an Executor instead.
   public PubsubEventModule(boolean async) {
     this(async, Logger.getLogger(PubsubEventModule.class.getName()));
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/d9c911a6/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java 
b/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java
new file mode 100644
index 0000000..eb37ace
--- /dev/null
+++ 
b/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.twitter.common.quantity.Amount;
+import com.twitter.common.quantity.Time;
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expectLastCall;
+
+public class GatedDelayExecutorTest extends EasyMockTest {
+
+  private static final Amount<Long, Time> ONE_SECOND = Amount.of(1L, 
Time.SECONDS);
+
+  private ScheduledExecutorService mockExecutor;
+  private Runnable runnable;
+  private GatedDelayExecutor gatedExecutor;
+
+  @Before
+  public void setUp() {
+    mockExecutor = createMock(ScheduledExecutorService.class);
+    runnable = createMock(Runnable.class);
+    gatedExecutor = new GatedDelayExecutor(mockExecutor);
+  }
+
+  @Test
+  public void testNoFlush() {
+    control.replay();
+
+    gatedExecutor.execute(runnable);
+    // flush() is not called, so no work is performed.
+  }
+
+  private IExpectationSetters<?> invokeWorkWhenSubmitted() {
+    return expectLastCall().andAnswer(new IAnswer<Object>() {
+      @Override
+      public Object answer() {
+        ((Runnable) EasyMock.getCurrentArguments()[0]).run();
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testExecute() {
+    mockExecutor.execute(EasyMock.<Runnable>anyObject());
+    invokeWorkWhenSubmitted();
+    runnable.run();
+    expectLastCall();
+
+    control.replay();
+
+    gatedExecutor.execute(runnable);
+    gatedExecutor.flush();
+  }
+
+  @Test
+  public void testExecuteAfterDelay() {
+    mockExecutor.schedule(
+        EasyMock.<Runnable>anyObject(),
+        eq(ONE_SECOND.getValue().longValue()),
+        eq(ONE_SECOND.getUnit().getTimeUnit()));
+    invokeWorkWhenSubmitted();
+    runnable.run();
+
+    control.replay();
+
+    gatedExecutor.execute(runnable, ONE_SECOND);
+    gatedExecutor.flush();
+  }
+}

Reply via email to