Repository: aurora Updated Branches: refs/heads/master 02ffef5de -> 03ec02389
Replace org.apache.aurora.common.base.Closure with java.util.function.Consumer Commons came with a `Closure` type which is identical to the Java 8 type `Consumer`. This replaces the former with the latter in the interests of reducing the commons code and fork. Reviewed at https://reviews.apache.org/r/46167/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/03ec0238 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/03ec0238 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/03ec0238 Branch: refs/heads/master Commit: 03ec02389544a1c67638b601d9256c339fb014e9 Parents: 02ffef5 Author: Zameer Manji <[email protected]> Authored: Fri Apr 15 14:02:57 2016 -0700 Committer: Zameer Manji <[email protected]> Committed: Fri Apr 15 14:02:57 2016 -0700 ---------------------------------------------------------------------- .../org/apache/aurora/common/base/Closure.java | 33 ------ .../org/apache/aurora/common/base/Closures.java | 93 --------------- .../apache/aurora/common/base/Consumers.java | 89 ++++++++++++++ .../apache/aurora/common/util/StateMachine.java | 40 +++---- .../util/templating/StringTemplateHelper.java | 8 +- .../apache/aurora/common/base/ClosuresTest.java | 116 ------------------ .../aurora/common/base/ConsumersTest.java | 118 +++++++++++++++++++ .../aurora/common/util/StateMachineTest.java | 43 +++---- .../aurora/scheduler/SchedulerLifecycle.java | 32 ++--- .../scheduler/http/JerseyTemplateServlet.java | 4 +- .../scheduler/state/TaskStateMachine.java | 24 ++-- .../scheduler/storage/log/LogStorage.java | 18 +-- .../scheduler/storage/log/StreamManager.java | 7 +- .../storage/log/StreamManagerImpl.java | 6 +- .../scheduler/storage/log/LogManagerTest.java | 18 +-- 15 files changed, 308 insertions(+), 341 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/main/java/org/apache/aurora/common/base/Closure.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/base/Closure.java b/commons/src/main/java/org/apache/aurora/common/base/Closure.java deleted file mode 100644 index 42c9d11..0000000 --- a/commons/src/main/java/org/apache/aurora/common/base/Closure.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.base; - -/** - * A closure that does not throw any checked exceptions. - * - * @param <T> Closure value type. - * - * @author John Sirois - */ -@FunctionalInterface -public interface Closure<T> { - // convenience typedef - - /** - * Performs a unit of work on item - * - * @param item the item to perform work against - */ - void execute(T item); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/main/java/org/apache/aurora/common/base/Closures.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/base/Closures.java b/commons/src/main/java/org/apache/aurora/common/base/Closures.java deleted file mode 100644 index 0953119..0000000 --- a/commons/src/main/java/org/apache/aurora/common/base/Closures.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.base; - -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Utilities for dealing with Closures. - * - * @author John Sirois - */ -public final class Closures { - - private static final Closure<?> NOOP = item -> { - // noop - }; - - private Closures() { - // utility - } - - /** - * Combines multiple closures into a single closure, whose calls are replicated sequentially - * in the order that they were provided. - * If an exception is encountered from a closure it propagates to the top-level closure and the - * remaining closures are not executed. - * - * @param closures Closures to combine. - * @param <T> Type accepted by the closures. - * @return A single closure that will fan out all calls to {@link Closure#execute(Object)} to - * the wrapped closures. - */ - public static <T> Closure<T> combine(Iterable<Closure<T>> closures) { - checkNotNull(closures); - checkArgument(Iterables.all(closures, Predicates.notNull())); - - final Iterable<Closure<T>> closuresCopy = ImmutableList.copyOf(closures); - - return item -> { - for (Closure<T> closure : closuresCopy) { - closure.execute(item); - } - }; - } - - /** - * Applies a filter to a closure, such that the closure will only be called when the filter is - * satisfied (returns {@code true}}. - * - * @param filter Filter to determine when {@code closure} is called. - * @param closure Closure to filter. - * @param <T> Type handled by the filter and the closure. - * @return A filtered closure. - */ - public static <T> Closure<T> filter(final Predicate<T> filter, final Closure<T> closure) { - checkNotNull(filter); - checkNotNull(closure); - - return item -> { - if (filter.apply(item)) { - closure.execute(item); - } - }; - } - - /** - * Returns a closure that will do nothing. - * - * @param <T> The closure argument type. - * @return A closure that does nothing. - */ - @SuppressWarnings("unchecked") - public static <T> Closure<T> noop() { - return (Closure<T>) NOOP; - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/main/java/org/apache/aurora/common/base/Consumers.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/base/Consumers.java b/commons/src/main/java/org/apache/aurora/common/base/Consumers.java new file mode 100644 index 0000000..06f3269 --- /dev/null +++ b/commons/src/main/java/org/apache/aurora/common/base/Consumers.java @@ -0,0 +1,89 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.base; + +import java.util.List; +import java.util.function.Consumer; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Utilities for dealing with Consumers. + * + * @author John Sirois + */ +public final class Consumers { + + private static final Consumer<?> NOOP = item -> { + // noop + }; + + private Consumers() { + // utility + } + + /** + * Combines multiple consumers into a single consumer, whose calls are replicated sequentially + * in the order that they were provided. + * If an exception is encountered from a consumer it propagates to the top-level consumer and the + * remaining consumer are not executed. + * + * @param consumers Consumers to combine. + * @param <T> Type accepted by the consumers. + * @return A single consumer that will fan out all calls to {@link Consumer#accept(Object)} to + * the wrapped consumers. + */ + public static <T> Consumer<T> combine(List<Consumer<T>> consumers) { + checkNotNull(consumers); + checkArgument(Iterables.all(consumers, Predicates.notNull())); + + return consumers.stream().reduce(noop(), Consumer::andThen); + } + + /** + * Applies a filter to a consumer, such that the consumer will only be called when the filter is + * satisfied (returns {@code true}}. + * + * @param filter Filter to determine when {@code consumer} is called. + * @param consumer Consumer to filter. + * @param <T> Type handled by the filter and the consumer. + * @return A filtered consumer. + */ + public static <T> Consumer<T> filter(final Predicate<T> filter, final Consumer<T> consumer) { + checkNotNull(filter); + checkNotNull(consumer); + + return item -> { + if (filter.apply(item)) { + consumer.accept(item); + } + }; + } + + /** + * Returns a consumer that will do nothing. + * + * @param <T> The consumer argument type. + * @return A consumer that does nothing. + */ + @SuppressWarnings("unchecked") + public static <T> Consumer<T> noop() { + return (Consumer<T>) NOOP; + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java b/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java index e8aa000..785c5f1 100644 --- a/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java +++ b/commons/src/main/java/org/apache/aurora/common/util/StateMachine.java @@ -19,6 +19,7 @@ import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -30,8 +31,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.base.Closures; +import org.apache.aurora.common.base.Consumers; import org.apache.commons.lang.builder.HashCodeBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +58,7 @@ public class StateMachine<T> { // Stores mapping from states to the states that the machine is allowed to transition into. private final Multimap<T, T> stateTransitions; - private final Closure<Transition<T>> transitionCallback; + private final Consumer<Transition<T>> transitionCallback; private final boolean throwOnBadTransition; private volatile T currentState; @@ -69,7 +69,7 @@ public class StateMachine<T> { private StateMachine(String name, T initialState, Multimap<T, T> stateTransitions, - Closure<Transition<T>> transitionCallback, + Consumer<Transition<T>> transitionCallback, boolean throwOnBadTransition) { this.name = name; this.currentState = initialState; @@ -157,7 +157,7 @@ public class StateMachine<T> { writeLock.unlock(); } - transitionCallback.execute(new Transition<T>(currentCopy, nextState, transitionAllowed)); + transitionCallback.accept(new Transition<T>(currentCopy, nextState, transitionAllowed)); return transitionAllowed; } @@ -186,17 +186,17 @@ public class StateMachine<T> { public static class Rule<T> { private final T from; private final Set<T> to; - private final Closure<Transition<T>> callback; + private final Consumer<Transition<T>> callback; private Rule(T from) { this(from, ImmutableSet.<T>of()); } private Rule(T from, Set<T> to) { - this(from, to, Closures.<Transition<T>>noop()); + this(from, to, Consumers.<Transition<T>>noop()); } - private Rule(T from, Set<T> to, Closure<Transition<T>> callback) { + private Rule(T from, Set<T> to, Consumer<Transition<T>> callback) { this.from = checkNotNull(from); this.to = checkNotNull(to); this.callback = checkNotNull(callback); @@ -210,7 +210,7 @@ public class StateMachine<T> { * @return A new rule that is identical to this rule, but with the provided * callback */ - public Rule<T> withCallback(Closure<Transition<T>> callback) { + public Rule<T> withCallback(Consumer<Transition<T>> callback) { return new Rule<T>(from, to, callback); } @@ -280,7 +280,7 @@ public class StateMachine<T> { private final String name; private T initialState; private final Multimap<T, T> stateTransitions = HashMultimap.create(); - private final List<Closure<Transition<T>>> transitionCallbacks = Lists.newArrayList(); + private final List<Consumer<Transition<T>>> transitionCallbacks = Lists.newArrayList(); private boolean throwOnBadTransition = true; public Builder(String name) { @@ -319,8 +319,8 @@ public class StateMachine<T> { * @param transitionStates Allowed transitions from {@code state}. * @return A reference to the builder. */ - public Builder<T> addState(Closure<Transition<T>> callback, T state, - Set<T> transitionStates) { + public Builder<T> addState(Consumer<Transition<T>> callback, T state, + Set<T> transitionStates) { checkNotNull(callback); checkNotNull(state); @@ -335,15 +335,15 @@ public class StateMachine<T> { } /** - * Varargs version of {@link #addState(Closure, Object, java.util.Set)}. + * Varargs version of {@link #addState(Consumer, Object, java.util.Set)}. * * @param callback Callback to notify of any transition attempted from the state. * @param state State to add. * @param transitionStates Allowed transitions from {@code state}. * @return A reference to the builder. */ - public Builder<T> addState(Closure<Transition<T>> callback, T state, - T... transitionStates) { + public Builder<T> addState(Consumer<Transition<T>> callback, T state, + T... transitionStates) { Set<T> states = ImmutableSet.copyOf(transitionStates); Preconditions.checkArgument(Iterables.all(states, Predicates.notNull())); @@ -360,12 +360,12 @@ public class StateMachine<T> { * @return A reference to the builder. */ public Builder<T> addState(T state, T... transitionStates) { - return addState(Closures.<Transition<T>>noop(), state, transitionStates); + return addState(Consumers.<Transition<T>>noop(), state, transitionStates); } private void onTransition(Predicate<Transition<T>> transitionFilter, - Closure<Transition<T>> handler) { - onAnyTransition(Closures.filter(transitionFilter, handler)); + Consumer<Transition<T>> handler) { + onAnyTransition(Consumers.filter(transitionFilter, handler)); } /** @@ -375,7 +375,7 @@ public class StateMachine<T> { * @param handler Callback to notify of transition attempts. * @return A reference to the builder. */ - public Builder<T> onAnyTransition(Closure<Transition<T>> handler) { + public Builder<T> onAnyTransition(Consumer<Transition<T>> handler) { transitionCallbacks.add(handler); return this; } @@ -413,7 +413,7 @@ public class StateMachine<T> { return new StateMachine<T>(name, initialState, stateTransitions, - Closures.combine(transitionCallbacks), + Consumers.combine(transitionCallbacks), throwOnBadTransition); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java ---------------------------------------------------------------------- diff --git a/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java b/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java index bcfa003..947e42e 100644 --- a/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java +++ b/commons/src/main/java/org/apache/aurora/common/util/templating/StringTemplateHelper.java @@ -15,13 +15,13 @@ package org.apache.aurora.common.util.templating; import java.io.IOException; import java.io.Writer; +import java.util.function.Consumer; import com.google.common.base.Preconditions; import org.antlr.stringtemplate.AutoIndentWriter; import org.antlr.stringtemplate.StringTemplate; import org.antlr.stringtemplate.StringTemplateGroup; -import org.apache.aurora.common.base.Closure; import org.apache.aurora.common.base.MorePreconditions; /** @@ -74,19 +74,19 @@ public class StringTemplateHelper { * the unpopulated template object. * * @param out Template output writer. - * @param parameterSetter Closure to populate the template. + * @param parameterSetter Consumer to populate the template. * @throws TemplateException If an exception was encountered while populating the template. */ public void writeTemplate( Writer out, - Closure<StringTemplate> parameterSetter) throws TemplateException { + Consumer<StringTemplate> parameterSetter) throws TemplateException { Preconditions.checkNotNull(out); Preconditions.checkNotNull(parameterSetter); StringTemplate stringTemplate = group.getInstanceOf(templatePath); try { - parameterSetter.execute(stringTemplate); + parameterSetter.accept(stringTemplate); stringTemplate.write(new AutoIndentWriter(out)); } catch (IOException e) { throw new TemplateException("Failed to write template: " + e, e); http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/test/java/org/apache/aurora/common/base/ClosuresTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/base/ClosuresTest.java b/commons/src/test/java/org/apache/aurora/common/base/ClosuresTest.java deleted file mode 100644 index 07bf23c..0000000 --- a/commons/src/test/java/org/apache/aurora/common/base/ClosuresTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.common.base; - -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; - -import org.apache.aurora.common.testing.easymock.EasyMockTest; -import org.junit.Test; - -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.junit.Assert.fail; - -/** - * @author John Sirois - */ -public class ClosuresTest extends EasyMockTest { - - private static final Clazz<Closure<Integer>> INT_CLOSURE_CLZ = new Clazz<Closure<Integer>>() { }; - static class Thrown extends RuntimeException { } - - @Test - public void testCombine() { - Closure<Integer> work1 = createMock(INT_CLOSURE_CLZ); - Closure<Integer> work2 = createMock(INT_CLOSURE_CLZ); - - @SuppressWarnings("unchecked") // Needed because type information lost in vargs. - Closure<Integer> wrapper = Closures.combine(ImmutableList.of(work1, work2)); - - work1.execute(1); - work2.execute(1); - - work1.execute(2); - work2.execute(2); - - control.replay(); - - wrapper.execute(1); - wrapper.execute(2); - } - - @Test - public void testCombineOneThrows() { - Closure<Integer> work1 = createMock(INT_CLOSURE_CLZ); - Closure<Integer> work2 = createMock(INT_CLOSURE_CLZ); - Closure<Integer> work3 = createMock(INT_CLOSURE_CLZ); - - @SuppressWarnings("unchecked") // Needed because type information lost in vargs. - Closure<Integer> wrapper = Closures.combine(ImmutableList.of(work1, work2, work3)); - - work1.execute(1); - expectLastCall().andThrow(new Thrown()); - - work1.execute(2); - work2.execute(2); - expectLastCall().andThrow(new Thrown()); - - work1.execute(3); - work2.execute(3); - work3.execute(3); - expectLastCall().andThrow(new Thrown()); - - control.replay(); - - try { - wrapper.execute(1); - fail("Should have thrown."); - } catch (Thrown e) { - // Expected. - } - - try { - wrapper.execute(2); - fail("Should have thrown."); - } catch (Thrown e) { - // Expected. - } - - try { - wrapper.execute(3); - fail("Should have thrown."); - } catch (Thrown e) { - // Expected. - } - } - - @Test - public void testFilter() { - Predicate<Integer> filter = createMock(new Clazz<Predicate<Integer>>() { }); - Closure<Integer> work = createMock(INT_CLOSURE_CLZ); - - expect(filter.apply(1)).andReturn(true); - work.execute(1); - - expect(filter.apply(2)).andReturn(false); - - Closure<Integer> filtered = Closures.filter(filter, work); - - control.replay(); - - filtered.execute(1); - filtered.execute(2); - } -} http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/test/java/org/apache/aurora/common/base/ConsumersTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/base/ConsumersTest.java b/commons/src/test/java/org/apache/aurora/common/base/ConsumersTest.java new file mode 100644 index 0000000..f25dd19 --- /dev/null +++ b/commons/src/test/java/org/apache/aurora/common/base/ConsumersTest.java @@ -0,0 +1,118 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.aurora.common.base; + +import java.util.function.Consumer; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; + +import org.apache.aurora.common.testing.easymock.EasyMockTest; +import org.junit.Test; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.junit.Assert.fail; + +/** + * @author John Sirois + */ +public class ConsumersTest extends EasyMockTest { + + private static final Clazz<Consumer<Integer>> INT_CLOSURE_CLZ = new Clazz<Consumer<Integer>>() { }; + static class Thrown extends RuntimeException { } + + @Test + public void testCombine() { + Consumer<Integer> work1 = createMock(INT_CLOSURE_CLZ); + Consumer<Integer> work2 = createMock(INT_CLOSURE_CLZ); + + @SuppressWarnings("unchecked") // Needed because type information lost in vargs. + Consumer<Integer> wrapper = Consumers.combine(ImmutableList.of(work1, work2)); + + work1.accept(1); + work2.accept(1); + + work1.accept(2); + work2.accept(2); + + control.replay(); + + wrapper.accept(1); + wrapper.accept(2); + } + + @Test + public void testCombineOneThrows() { + Consumer<Integer> work1 = createMock(INT_CLOSURE_CLZ); + Consumer<Integer> work2 = createMock(INT_CLOSURE_CLZ); + Consumer<Integer> work3 = createMock(INT_CLOSURE_CLZ); + + @SuppressWarnings("unchecked") // Needed because type information lost in vargs. + Consumer<Integer> wrapper = Consumers.combine(ImmutableList.of(work1, work2, work3)); + + work1.accept(1); + expectLastCall().andThrow(new Thrown()); + + work1.accept(2); + work2.accept(2); + expectLastCall().andThrow(new Thrown()); + + work1.accept(3); + work2.accept(3); + work3.accept(3); + expectLastCall().andThrow(new Thrown()); + + control.replay(); + + try { + wrapper.accept(1); + fail("Should have thrown."); + } catch (Thrown e) { + // Expected. + } + + try { + wrapper.accept(2); + fail("Should have thrown."); + } catch (Thrown e) { + // Expected. + } + + try { + wrapper.accept(3); + fail("Should have thrown."); + } catch (Thrown e) { + // Expected. + } + } + + @Test + public void testFilter() { + Predicate<Integer> filter = createMock(new Clazz<Predicate<Integer>>() { }); + Consumer<Integer> work = createMock(INT_CLOSURE_CLZ); + + expect(filter.apply(1)).andReturn(true); + work.accept(1); + + expect(filter.apply(2)).andReturn(false); + + Consumer<Integer> filtered = Consumers.filter(filter, work); + + control.replay(); + + filtered.accept(1); + filtered.accept(2); + } +} http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/commons/src/test/java/org/apache/aurora/common/util/StateMachineTest.java ---------------------------------------------------------------------- diff --git a/commons/src/test/java/org/apache/aurora/common/util/StateMachineTest.java b/commons/src/test/java/org/apache/aurora/common/util/StateMachineTest.java index 9591e82..1c0ee47 100644 --- a/commons/src/test/java/org/apache/aurora/common/util/StateMachineTest.java +++ b/commons/src/test/java/org/apache/aurora/common/util/StateMachineTest.java @@ -13,8 +13,9 @@ */ package org.apache.aurora.common.util; -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.base.Closures; +import java.util.function.Consumer; + +import org.apache.aurora.common.base.Consumers; import org.apache.aurora.common.testing.easymock.EasyMockTest; import org.apache.aurora.common.util.StateMachine.Rule; import org.apache.aurora.common.util.StateMachine.Transition; @@ -199,32 +200,32 @@ public class StateMachineTest extends EasyMockTest { assertThat(machine.getState(), is(A)); } - private static final Clazz<Closure<Transition<String>>> TRANSITION_CLOSURE_CLZ = - new Clazz<Closure<Transition<String>>>() {}; + private static final Clazz<Consumer<Transition<String>>> TRANSITION_CLOSURE_CLZ = + new Clazz<Consumer<Transition<String>>>() {}; @Test public void testTransitionCallbacks() { - Closure<Transition<String>> anyTransition = createMock(TRANSITION_CLOSURE_CLZ); - Closure<Transition<String>> fromA = createMock(TRANSITION_CLOSURE_CLZ); - Closure<Transition<String>> fromB = createMock(TRANSITION_CLOSURE_CLZ); + Consumer<Transition<String>> anyTransition = createMock(TRANSITION_CLOSURE_CLZ); + Consumer<Transition<String>> fromA = createMock(TRANSITION_CLOSURE_CLZ); + Consumer<Transition<String>> fromB = createMock(TRANSITION_CLOSURE_CLZ); Transition<String> aToB = new Transition<>(A, B, true); - anyTransition.execute(aToB); - fromA.execute(aToB); + anyTransition.accept(aToB); + fromA.accept(aToB); Transition<String> bToB = new Transition<>(B, B, false); - anyTransition.execute(bToB); - fromB.execute(bToB); + anyTransition.accept(bToB); + fromB.accept(bToB); Transition<String> bToC = new Transition<>(B, C, true); - anyTransition.execute(bToC); - fromB.execute(bToC); + anyTransition.accept(bToC); + fromB.accept(bToC); - anyTransition.execute(new Transition<>(C, B, true)); + anyTransition.accept(new Transition<>(C, B, true)); Transition<String> bToD = new Transition<>(B, D, true); - anyTransition.execute(bToD); - fromB.execute(bToD); + anyTransition.accept(bToD); + fromB.accept(bToD); control.replay(); @@ -247,10 +248,10 @@ public class StateMachineTest extends EasyMockTest { @Test public void testFilteredTransitionCallbacks() { - Closure<Transition<String>> aToBHandler = createMock(TRANSITION_CLOSURE_CLZ); - Closure<Transition<String>> impossibleHandler = createMock(TRANSITION_CLOSURE_CLZ); + Consumer<Transition<String>> aToBHandler = createMock(TRANSITION_CLOSURE_CLZ); + Consumer<Transition<String>> impossibleHandler = createMock(TRANSITION_CLOSURE_CLZ); - aToBHandler.execute(new Transition<>(A, B, true)); + aToBHandler.accept(new Transition<>(A, B, true)); control.replay(); @@ -258,9 +259,9 @@ public class StateMachineTest extends EasyMockTest { .initialState(A) .addState(Rule .from(A).to(B, C) - .withCallback(Closures.filter(Transition.to(B), aToBHandler))) + .withCallback(Consumers.filter(Transition.to(B), aToBHandler))) .addState(Rule.from(B).to(A) - .withCallback(Closures.filter(Transition.to(B), impossibleHandler))) + .withCallback(Consumers.filter(Transition.to(B), impossibleHandler))) .addState(Rule.from(C).noTransitions()) .build(); http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java index debe899..195ab91 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerLifecycle.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import javax.inject.Inject; import javax.inject.Qualifier; @@ -39,8 +40,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.aurora.GuavaUtils.ServiceManagerIface; import org.apache.aurora.common.application.Lifecycle; import org.apache.aurora.common.application.ShutdownRegistry; -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.base.Closures; +import org.apache.aurora.common.base.Consumers; import org.apache.aurora.common.base.ExceptionalCommand; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; @@ -222,17 +222,17 @@ public class SchedulerLifecycle implements EventSubscriber { } }); - final Closure<Transition<State>> prepareStorage = new Closure<Transition<State>>() { + final Consumer<Transition<State>> prepareStorage = new Consumer<Transition<State>>() { @Override - public void execute(Transition<State> transition) { + public void accept(Transition<State> transition) { storage.prepare(); stateMachine.transition(State.STORAGE_PREPARED); } }; - final Closure<Transition<State>> handleLeading = new Closure<Transition<State>>() { + final Consumer<Transition<State>> handleLeading = new Consumer<Transition<State>>() { @Override - public void execute(Transition<State> transition) { + public void accept(Transition<State> transition) { LOG.info("Elected as leading scheduler!"); storage.start(stores -> { @@ -258,9 +258,9 @@ public class SchedulerLifecycle implements EventSubscriber { } }; - final Closure<Transition<State>> handleRegistered = new Closure<Transition<State>>() { + final Consumer<Transition<State>> handleRegistered = new Consumer<Transition<State>>() { @Override - public void execute(Transition<State> transition) { + public void accept(Transition<State> transition) { registrationAcked.set(true); delayedActions.blockingDriverJoin(() -> { driver.blockUntilStopped(); @@ -279,10 +279,10 @@ public class SchedulerLifecycle implements EventSubscriber { } }; - final Closure<Transition<State>> shutDown = new Closure<Transition<State>>() { + final Consumer<Transition<State>> shutDown = new Consumer<Transition<State>>() { private final AtomicBoolean invoked = new AtomicBoolean(false); @Override - public void execute(Transition<State> transition) { + public void accept(Transition<State> transition) { if (!invoked.compareAndSet(false, true)) { LOG.info("Shutdown already invoked, ignoring extra call."); return; @@ -314,18 +314,18 @@ public class SchedulerLifecycle implements EventSubscriber { .initialState(State.IDLE) .logTransitions() .addState( - dieOnError(Closures.filter(NOT_DEAD, prepareStorage)), + dieOnError(Consumers.filter(NOT_DEAD, prepareStorage)), State.IDLE, State.PREPARING_STORAGE, State.DEAD) .addState( State.PREPARING_STORAGE, State.STORAGE_PREPARED, State.DEAD) .addState( - dieOnError(Closures.filter(NOT_DEAD, handleLeading)), + dieOnError(Consumers.filter(NOT_DEAD, handleLeading)), State.STORAGE_PREPARED, State.LEADER_AWAITING_REGISTRATION, State.DEAD) .addState( - dieOnError(Closures.filter(NOT_DEAD, handleRegistered)), + dieOnError(Consumers.filter(NOT_DEAD, handleRegistered)), State.LEADER_AWAITING_REGISTRATION, State.ACTIVE, State.DEAD) .addState( @@ -337,16 +337,16 @@ public class SchedulerLifecycle implements EventSubscriber { State.DEAD ) .onAnyTransition( - Closures.filter(IS_DEAD, shutDown)) + Consumers.filter(IS_DEAD, shutDown)) .build(); this.leadershipListener = new SchedulerCandidateImpl(stateMachine, leaderControl); } - private Closure<Transition<State>> dieOnError(final Closure<Transition<State>> closure) { + private Consumer<Transition<State>> dieOnError(final Consumer<Transition<State>> closure) { return transition -> { try { - closure.execute(transition); + closure.accept(transition); } catch (RuntimeException e) { LOG.error("Caught unchecked exception: " + e, e); stateMachine.transition(State.DEAD); http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java b/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java index 302388d..73455a0 100644 --- a/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java +++ b/src/main/java/org/apache/aurora/scheduler/http/JerseyTemplateServlet.java @@ -14,12 +14,12 @@ package org.apache.aurora.scheduler.http; import java.io.StringWriter; +import java.util.function.Consumer; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; import org.antlr.stringtemplate.StringTemplate; -import org.apache.aurora.common.base.Closure; import org.apache.aurora.common.util.templating.StringTemplateHelper; import org.apache.aurora.common.util.templating.StringTemplateHelper.TemplateException; @@ -36,7 +36,7 @@ class JerseyTemplateServlet { templateHelper = new StringTemplateHelper(getClass(), templatePath, true); } - protected final Response fillTemplate(Closure<StringTemplate> populator) { + protected final Response fillTemplate(Consumer<StringTemplate> populator) { StringWriter output = new StringWriter(); try { templateHelper.writeTemplate(output, populator); http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java index 6fd2951..23f256a 100644 --- a/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java +++ b/src/main/java/org/apache/aurora/scheduler/state/TaskStateMachine.java @@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.state; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import javax.annotation.Nullable; @@ -27,9 +28,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; -import org.apache.aurora.common.base.Closure; -import org.apache.aurora.common.base.Closures; import org.apache.aurora.common.base.Command; +import org.apache.aurora.common.base.Consumers; import org.apache.aurora.common.base.MorePreconditions; import org.apache.aurora.common.stats.Stats; import org.apache.aurora.common.util.StateMachine; @@ -168,17 +168,17 @@ class TaskStateMachine { "A task that does not exist must start in DELETED state."); } - Closure<Transition<TaskState>> manageTerminatedTasks = Closures.combine( - ImmutableList.<Closure<Transition<TaskState>>>builder() + Consumer<Transition<TaskState>> manageTerminatedTasks = Consumers.combine( + ImmutableList.<Consumer<Transition<TaskState>>>builder() // Kill a task that we believe to be terminated when an attempt is made to revive. .add( - Closures.filter(Transition.to(ASSIGNED, STARTING, RUNNING), + Consumers.filter(Transition.to(ASSIGNED, STARTING, RUNNING), addFollowupClosure(KILL))) // Remove a terminated task that is requested to be deleted. - .add(Closures.filter(Transition.to(DELETED), addFollowupClosure(DELETE))) + .add(Consumers.filter(Transition.to(DELETED), addFollowupClosure(DELETE))) .build()); - final Closure<Transition<TaskState>> manageRestartingTask = + final Consumer<Transition<TaskState>> manageRestartingTask = transition -> { switch (transition.getTo()) { case ASSIGNED: @@ -243,8 +243,8 @@ class TaskStateMachine { } }; - final Closure<Transition<TaskState>> deleteIfKilling = - Closures.filter(Transition.to(KILLING), addFollowupClosure(DELETE)); + final Consumer<Transition<TaskState>> deleteIfKilling = + Consumers.filter(Transition.to(KILLING), addFollowupClosure(DELETE)); stateMachine = StateMachine.<TaskState>builder(name) .logTransitions() @@ -432,9 +432,9 @@ class TaskStateMachine { // Since we want this action to be performed last in the transition sequence, the callback // must be the last chained transition callback. .onAnyTransition( - new Closure<Transition<TaskState>>() { + new Consumer<Transition<TaskState>>() { @Override - public void execute(final Transition<TaskState> transition) { + public void accept(final Transition<TaskState> transition) { if (transition.isValidStateChange()) { TaskState from = transition.getFrom(); TaskState to = transition.getTo(); @@ -475,7 +475,7 @@ class TaskStateMachine { sideEffects.add(sideEffect); } - private Closure<Transition<TaskState>> addFollowupClosure(final Action action) { + private Consumer<Transition<TaskState>> addFollowupClosure(final Action action) { return item -> addFollowup(action); } http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java index 5143668..f586186 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import javax.inject.Inject; @@ -29,7 +30,6 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; import org.apache.aurora.common.application.ShutdownRegistry; -import org.apache.aurora.common.base.Closure; import org.apache.aurora.common.inject.TimedInterceptor.Timed; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Time; @@ -204,8 +204,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore private final SlidingStats writerWaitStats = new SlidingStats("log_storage_write_lock_wait", "ns"); - private final Map<LogEntry._Fields, Closure<LogEntry>> logEntryReplayActions; - private final Map<Op._Fields, Closure<Op>> transactionReplayActions; + private final Map<LogEntry._Fields, Consumer<LogEntry>> logEntryReplayActions; + private final Map<Op._Fields, Consumer<Op>> transactionReplayActions; @Inject LogStorage( @@ -303,8 +303,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore } @VisibleForTesting - final Map<LogEntry._Fields, Closure<LogEntry>> buildLogEntryReplayActions() { - return ImmutableMap.<LogEntry._Fields, Closure<LogEntry>>builder() + final Map<LogEntry._Fields, Consumer<LogEntry>> buildLogEntryReplayActions() { + return ImmutableMap.<LogEntry._Fields, Consumer<LogEntry>>builder() .put(LogEntry._Fields.SNAPSHOT, logEntry -> { Snapshot snapshot = logEntry.getSnapshot(); LOG.info("Applying snapshot taken on " + new Date(snapshot.getTimestamp())); @@ -322,8 +322,8 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore } @VisibleForTesting - final Map<Op._Fields, Closure<Op>> buildTransactionReplayActions() { - return ImmutableMap.<Op._Fields, Closure<Op>>builder() + final Map<Op._Fields, Consumer<Op>> buildTransactionReplayActions() { + return ImmutableMap.<Op._Fields, Consumer<Op>>builder() .put( Op._Fields.SAVE_FRAMEWORK_ID, op -> writeBehindSchedulerStore.saveFrameworkId(op.getSaveFrameworkId().getId())) @@ -451,7 +451,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore throw new IllegalStateException("Unknown log entry type: " + entryField); } - logEntryReplayActions.get(entryField).execute(logEntry); + logEntryReplayActions.get(entryField).accept(logEntry); } private void replayOp(Op op) { @@ -460,7 +460,7 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore throw new IllegalStateException("Unknown transaction op: " + opField); } - transactionReplayActions.get(opField).execute(op); + transactionReplayActions.get(opField).accept(op); } private void scheduleSnapshots() { http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java index 76a574f..ea147c0 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java @@ -13,7 +13,8 @@ */ package org.apache.aurora.scheduler.storage.log; -import org.apache.aurora.common.base.Closure; +import java.util.function.Consumer; + import org.apache.aurora.gen.storage.LogEntry; import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.scheduler.log.Log; @@ -24,7 +25,7 @@ import static org.apache.aurora.scheduler.log.Log.Stream.StreamAccessException; /** * Manages interaction with the log stream. Log entries can be - * {@link #readFromBeginning(Closure) read from} the beginning, + * {@link #readFromBeginning(Consumer) read from} the beginning, * a {@link #startTransaction() transaction} consisting of one or more local storage * operations can be committed atomically, or the log can be compacted by * {@link #snapshot(org.apache.aurora.gen.storage.Snapshot) snapshotting}. @@ -39,7 +40,7 @@ public interface StreamManager { * @throws InvalidPositionException if the given position is not found in the log. * @throws StreamAccessException if there is a problem reading from the log. */ - void readFromBeginning(Closure<LogEntry> reader) + void readFromBeginning(Consumer<LogEntry> reader) throws CodingException, InvalidPositionException, StreamAccessException; /** http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java index 766ec2d..baf2647 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import javax.annotation.Nullable; import javax.inject.Inject; @@ -32,7 +33,6 @@ import com.google.common.hash.Hasher; import com.google.common.primitives.Bytes; import com.google.inject.assistedinject.Assisted; -import org.apache.aurora.common.base.Closure; import org.apache.aurora.common.stats.Stats; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.storage.Frame; @@ -95,7 +95,7 @@ class StreamManagerImpl implements StreamManager { } @Override - public void readFromBeginning(Closure<LogEntry> reader) + public void readFromBeginning(Consumer<LogEntry> reader) throws CodingException, InvalidPositionException, StreamAccessException { Iterator<Log.Entry> entries = stream.readAll(); @@ -116,7 +116,7 @@ class StreamManagerImpl implements StreamManager { snapshotDeduplicator.reduplicate(logEntry.getDeduplicatedSnapshot())); } - reader.execute(logEntry); + reader.accept(logEntry); vars.entriesRead.incrementAndGet(); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/03ec0238/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java index 0256c06..7344051 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; +import java.util.function.Consumer; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; @@ -32,7 +33,6 @@ import com.google.common.hash.Hashing; import org.apache.aurora.codec.ThriftBinaryCodec; import org.apache.aurora.codec.ThriftBinaryCodec.CodingException; -import org.apache.aurora.common.base.Closure; import org.apache.aurora.common.quantity.Amount; import org.apache.aurora.common.quantity.Data; import org.apache.aurora.common.testing.easymock.EasyMockTest; @@ -111,7 +111,7 @@ public class LogManagerTest extends EasyMockTest { public void testStreamManagerReadFromUnknownNone() throws CodingException { expect(stream.readAll()).andReturn(Iterators.emptyIterator()); - Closure<LogEntry> reader = createMock(new Clazz<Closure<LogEntry>>() { }); + Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { }); control.replay(); @@ -126,8 +126,8 @@ public class LogManagerTest extends EasyMockTest { expect(entry1.contents()).andReturn(encode(transaction1)); expect(stream.readAll()).andReturn(Iterators.singletonIterator(entry1)); - Closure<LogEntry> reader = createMock(new Clazz<Closure<LogEntry>>() { }); - reader.execute(transaction1); + Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { }); + reader.accept(transaction1); control.replay(); @@ -468,9 +468,9 @@ public class LogManagerTest extends EasyMockTest { expect(stream.readAll()).andReturn(entries.iterator()); - Closure<LogEntry> reader = createMock(new Clazz<Closure<LogEntry>>() { }); - reader.execute(transaction1); - reader.execute(transaction2); + Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { }); + reader.accept(transaction1); + reader.accept(transaction2); StreamManager streamManager = createStreamManager(message.chunkSize); control.replay(); @@ -493,8 +493,8 @@ public class LogManagerTest extends EasyMockTest { expect(stream.readAll()).andReturn(ImmutableList.of(snapshotEntry).iterator()); - Closure<LogEntry> reader = createMock(new Clazz<Closure<LogEntry>>() { }); - reader.execute(snapshotLogEntry); + Consumer<LogEntry> reader = createMock(new Clazz<Consumer<LogEntry>>() { }); + reader.accept(snapshotLogEntry); control.replay();
