Repository: reef Updated Branches: refs/heads/master 72fd9de12 -> 3056022be
[REEF-1006] Remove TimeoutSubject and tests from Wake `TimeoutSubject` is deprecated in REEF-535 of Release 0.14. This change removes TimeoutSubject.java and TimeoutSubjectTest.java in reef-wake. JIRA: [REEF-1006](https://issues.apache.org/jira/browse/REEF-1006) Pull Request: This closes #861 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/3056022b Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/3056022b Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/3056022b Branch: refs/heads/master Commit: 3056022be5325cffcd158bc0934bcc72abb2b74b Parents: 72fd9de Author: Dongjoon Hyun <[email protected]> Authored: Thu Feb 25 13:19:30 2016 -0800 Committer: Mariia Mykhailova <[email protected]> Committed: Thu Feb 25 14:21:36 2016 -0800 ---------------------------------------------------------------------- .../reef/wake/rx/impl/TimeoutSubject.java | 93 ------------ .../reef/wake/test/rx/TimeoutSubjectTest.java | 146 ------------------- 2 files changed, 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/3056022b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java deleted file mode 100644 index 89bba0f..0000000 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.wake.rx.impl; - -import org.apache.reef.wake.rx.Observer; -import org.apache.reef.wake.rx.Subject; - -import java.util.concurrent.TimeoutException; - -/** - * A class implementing {@code Subject<T>} with timeout. - * - * @param <T> a type of subject - * @deprecated in 0.14 as unused - */ -@Deprecated -public class TimeoutSubject<T> implements Subject<T, T> { - private Thread timeBomb; - private Observer<T> destination; - private boolean finished; - - public TimeoutSubject(final long timeout, final Observer<T> handler) { - this.finished = false; - this.destination = handler; - final TimeoutSubject<T> outer = this; - this.timeBomb = new Thread(new Runnable() { - @Override - public void run() { - final boolean finishedCopy; - synchronized (outer) { - if (!finished) { - try { - outer.wait(timeout); - } catch (final InterruptedException e) { - return; - } - } - finishedCopy = finished; - finished = true; // lock out the caller from putting event through now - } - if (!finishedCopy) { - destination.onError(new TimeoutException("TimeoutSubject expired")); - } - } - }); - this.timeBomb.start(); - } - - @Override - public void onNext(final T value) { - final boolean wasFinished; - synchronized (this) { - wasFinished = finished; - if (!finished) { - this.notify(); - finished = true; - } - } - if (!wasFinished) { - // TODO[JIRA unneeded due to deprecation]: change Subject to specify conversion to T - destination.onNext(value); - destination.onCompleted(); - } - } - - @Override - public void onError(final Exception error) { - this.timeBomb.interrupt(); - destination.onError(error); - } - - @Override - public void onCompleted() { - throw new IllegalStateException("Should not be called directly"); - } - -} http://git-wip-us.apache.org/repos/asf/reef/blob/3056022b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java deleted file mode 100644 index c433b81..0000000 --- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.reef.wake.test.rx; - -import org.apache.reef.wake.rx.Observer; -import org.apache.reef.wake.rx.Subject; -import org.apache.reef.wake.rx.impl.TimeoutSubject; -import org.junit.Test; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.*; - -/** - * Tests for TimeoutSubject. - */ -public class TimeoutSubjectTest { - - @Test - public void testSuccess() { - final AtomicInteger nexts = new AtomicInteger(0); - final AtomicInteger completes = new AtomicInteger(0); - final int delta = 400; - final Subject<Integer, Integer> dut = new TimeoutSubject<>(10000, new Observer<Integer>() { - - @Override - public void onNext(final Integer value) { - nexts.addAndGet(delta); - } - - @Override - public void onError(final Exception error) { - fail(error.toString()); - } - - @Override - public void onCompleted() { - assertEquals(delta, nexts.get()); - completes.incrementAndGet(); - } - }); - dut.onNext(delta); - - assertEquals(delta, nexts.get()); - assertEquals(1, completes.get()); - } - - @Test - public void testDifferentThread() { - final AtomicInteger nexts = new AtomicInteger(0); - final AtomicInteger completes = new AtomicInteger(0); - final int delta = 400; - final Subject<Integer, Integer> dut = new TimeoutSubject<>(10000, new Observer<Integer>() { - - @Override - public void onNext(final Integer value) { - nexts.addAndGet(delta); - } - - @Override - public void onError(final Exception error) { - fail(error.toString()); - } - - @Override - public void onCompleted() { - assertEquals(delta, nexts.get()); - completes.incrementAndGet(); - } - }); - - final ExecutorService e = Executors.newSingleThreadExecutor(); - e.submit(new Runnable() { - @Override - public void run() { - dut.onNext(delta); - } - }); - - e.shutdown(); - try { - e.awaitTermination(11000, TimeUnit.MILLISECONDS); - } catch (final InterruptedException e1) { - e1.printStackTrace(); - fail(e1.toString()); - } - - assertEquals(delta, nexts.get()); - assertEquals(1, completes.get()); - } - - @Test - public void testTimeout() { - final int timeout = 1; - final int sleep = 500; - final AtomicInteger errors = new AtomicInteger(0); - final Subject<Integer, Integer> dut = new TimeoutSubject<>(timeout, new Observer<Integer>() { - - @Override - public void onNext(final Integer value) { - fail("Should not get called"); - } - - @Override - public void onError(final Exception error) { - assertTrue(error instanceof TimeoutException); - errors.incrementAndGet(); - } - - @Override - public void onCompleted() { - fail("Should not get called"); - } - }); - - try { - Thread.sleep(sleep); - } catch (final InterruptedException e) { - e.printStackTrace(); - fail(e.toString()); - } - dut.onNext(0xC0FFEE); - - assertEquals(1, errors.get()); - } -}
