http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java deleted file mode 100644 index 85c64d0..0000000 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestExecutorsTest.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.fn.harness.test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.junit.runners.model.Statement; - -/** Tests for {@link TestExecutors}. */ -@RunWith(JUnit4.class) -public class TestExecutorsTest { - @Test - public void testSuccessfulTermination() throws Throwable { - ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); - final AtomicBoolean taskRan = new AtomicBoolean(); - testService - .apply( - new Statement() { - @Override - public void evaluate() throws Throwable { - testService.submit(() -> taskRan.set(true)); - } - }, - null) - .evaluate(); - assertTrue(service.isTerminated()); - assertTrue(taskRan.get()); - } - - @Test - public void testTaskBlocksForeverCausesFailure() throws Throwable { - ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); - final AtomicBoolean taskStarted = new AtomicBoolean(); - final AtomicBoolean taskWasInterrupted = new AtomicBoolean(); - try { - testService - .apply( - new Statement() { - @Override - public void evaluate() throws Throwable { - testService.submit(this::taskToRun); - } - - private void taskToRun() { - taskStarted.set(true); - try { - while (true) { - Thread.sleep(10000); - } - } catch (InterruptedException e) { - taskWasInterrupted.set(true); - return; - } - } - }, - null) - .evaluate(); - fail(); - } catch (IllegalStateException e) { - assertEquals(IllegalStateException.class, e.getClass()); - assertEquals("Test executor failed to shutdown cleanly.", e.getMessage()); - } - assertTrue(service.isShutdown()); - } - - @Test - public void testStatementFailurePropagatedCleanly() throws Throwable { - ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); - final RuntimeException exceptionToThrow = new RuntimeException(); - try { - testService - .apply( - new Statement() { - @Override - public void evaluate() throws Throwable { - throw exceptionToThrow; - } - }, - null) - .evaluate(); - fail(); - } catch (RuntimeException thrownException) { - assertSame(exceptionToThrow, thrownException); - } - assertTrue(service.isShutdown()); - } - - @Test - public void testStatementFailurePropagatedWhenExecutorServiceFailingToTerminate() - throws Throwable { - ExecutorService service = Executors.newSingleThreadExecutor(); - final TestExecutorService testService = TestExecutors.from(() -> service); - final AtomicBoolean taskStarted = new AtomicBoolean(); - final AtomicBoolean taskWasInterrupted = new AtomicBoolean(); - final RuntimeException exceptionToThrow = new RuntimeException(); - try { - testService - .apply( - new Statement() { - @Override - public void evaluate() throws Throwable { - testService.submit(this::taskToRun); - throw exceptionToThrow; - } - - private void taskToRun() { - taskStarted.set(true); - try { - while (true) { - Thread.sleep(10000); - } - } catch (InterruptedException e) { - taskWasInterrupted.set(true); - return; - } - } - }, - null) - .evaluate(); - fail(); - } catch (RuntimeException thrownException) { - assertSame(exceptionToThrow, thrownException); - assertEquals(1, exceptionToThrow.getSuppressed().length); - assertEquals(IllegalStateException.class, exceptionToThrow.getSuppressed()[0].getClass()); - assertEquals( - "Test executor failed to shutdown cleanly.", - exceptionToThrow.getSuppressed()[0].getMessage()); - } - assertTrue(service.isShutdown()); - } -}
http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java deleted file mode 100644 index f398286..0000000 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreams.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.fn.harness.test; - -import io.grpc.stub.CallStreamObserver; -import io.grpc.stub.StreamObserver; -import java.util.function.Consumer; -import java.util.function.Supplier; - -/** Utility methods which enable testing of {@link StreamObserver}s. */ -public class TestStreams { - /** - * Creates a test {@link CallStreamObserver} {@link Builder} that forwards - * {@link StreamObserver#onNext} calls to the supplied {@link Consumer}. - */ - public static <T> Builder<T> withOnNext(Consumer<T> onNext) { - return new Builder<>(new ForwardingCallStreamObserver<>( - onNext, - TestStreams::noop, - TestStreams::noop, - TestStreams::returnTrue)); - } - - /** A builder for a test {@link CallStreamObserver} that performs various callbacks. */ - public static class Builder<T> { - private final ForwardingCallStreamObserver<T> observer; - private Builder(ForwardingCallStreamObserver<T> observer) { - this.observer = observer; - } - - /** - * Returns a new {@link Builder} like this one with the specified - * {@link CallStreamObserver#isReady} callback. - */ - public Builder<T> withIsReady(Supplier<Boolean> isReady) { - return new Builder<>(new ForwardingCallStreamObserver<>( - observer.onNext, - observer.onError, - observer.onCompleted, - isReady)); - } - - /** - * Returns a new {@link Builder} like this one with the specified - * {@link StreamObserver#onCompleted} callback. - */ - public Builder<T> withOnCompleted(Runnable onCompleted) { - return new Builder<>(new ForwardingCallStreamObserver<>( - observer.onNext, - observer.onError, - onCompleted, - observer.isReady)); - } - - /** - * Returns a new {@link Builder} like this one with the specified - * {@link StreamObserver#onError} callback. - */ - public Builder<T> withOnError(Runnable onError) { - return new Builder<>(new ForwardingCallStreamObserver<>( - observer.onNext, - new Consumer<Throwable>() { - @Override - public void accept(Throwable t) { - onError.run(); - } - }, - observer.onCompleted, - observer.isReady)); - } - - /** - * Returns a new {@link Builder} like this one with the specified - * {@link StreamObserver#onError} consumer. - */ - public Builder<T> withOnError(Consumer<Throwable> onError) { - return new Builder<>(new ForwardingCallStreamObserver<>( - observer.onNext, onError, observer.onCompleted, observer.isReady)); - } - - public CallStreamObserver<T> build() { - return observer; - } - } - - private static void noop() { - } - - private static void noop(Throwable t) { - } - - private static boolean returnTrue() { - return true; - } - - /** A {@link CallStreamObserver} which executes the supplied callbacks. */ - private static class ForwardingCallStreamObserver<T> extends CallStreamObserver<T> { - private final Consumer<T> onNext; - private final Supplier<Boolean> isReady; - private final Consumer<Throwable> onError; - private final Runnable onCompleted; - - public ForwardingCallStreamObserver( - Consumer<T> onNext, - Consumer<Throwable> onError, - Runnable onCompleted, - Supplier<Boolean> isReady) { - this.onNext = onNext; - this.onError = onError; - this.onCompleted = onCompleted; - this.isReady = isReady; - } - - @Override - public void onNext(T value) { - onNext.accept(value); - } - - @Override - public void onError(Throwable t) { - onError.accept(t); - } - - @Override - public void onCompleted() { - onCompleted.run(); - } - - @Override - public boolean isReady() { - return isReady.get(); - } - - @Override - public void setOnReadyHandler(Runnable onReadyHandler) {} - - @Override - public void disableAutoInboundFlowControl() {} - - @Override - public void request(int count) {} - - @Override - public void setMessageCompression(boolean enable) {} - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java deleted file mode 100644 index b684c90..0000000 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/test/TestStreamsTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.fn.harness.test; - -import static org.hamcrest.Matchers.contains; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link TestStreams}. */ -@RunWith(JUnit4.class) -public class TestStreamsTest { - @Test - public void testOnNextIsCalled() { - AtomicBoolean onNextWasCalled = new AtomicBoolean(); - TestStreams.withOnNext(onNextWasCalled::set).build().onNext(true); - assertTrue(onNextWasCalled.get()); - } - - @Test - public void testIsReadyIsCalled() { - final AtomicBoolean isReadyWasCalled = new AtomicBoolean(); - assertFalse(TestStreams.withOnNext(null) - .withIsReady(() -> isReadyWasCalled.getAndSet(true)) - .build() - .isReady()); - assertTrue(isReadyWasCalled.get()); - } - - @Test - public void testOnCompletedIsCalled() { - AtomicBoolean onCompletedWasCalled = new AtomicBoolean(); - TestStreams.withOnNext(null) - .withOnCompleted(() -> onCompletedWasCalled.set(true)) - .build() - .onCompleted(); - assertTrue(onCompletedWasCalled.get()); - } - - @Test - public void testOnErrorRunnableIsCalled() { - RuntimeException throwable = new RuntimeException(); - AtomicBoolean onErrorWasCalled = new AtomicBoolean(); - TestStreams.withOnNext(null) - .withOnError(() -> onErrorWasCalled.set(true)) - .build() - .onError(throwable); - assertTrue(onErrorWasCalled.get()); - } - - @Test - public void testOnErrorConsumerIsCalled() { - RuntimeException throwable = new RuntimeException(); - Collection<Throwable> onErrorWasCalled = new ArrayList<>(); - TestStreams.withOnNext(null) - .withOnError(onErrorWasCalled::add) - .build() - .onError(throwable); - assertThat(onErrorWasCalled, contains(throwable)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/fdd5971d/sdks/java/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index e5af784..62e4ec3 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -53,6 +53,7 @@ <jdk>[1.8,)</jdk> </activation> <modules> + <module>fn-execution</module> <module>harness</module> <module>container</module> <module>java8tests</module>
