Repository: incubator-streams Updated Branches: refs/heads/master 00363f213 -> 4ac71f485
Trivial Anonymous -> Java 8 lambdas, replace assert with TestNg.Assert in Integration tests Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4ac71f48 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4ac71f48 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4ac71f48 Branch: refs/heads/master Commit: 4ac71f4856a8172dbca0309e48da03470e4365e1 Parents: 00363f2 Author: smarthi <smar...@apache.org> Authored: Sat Dec 17 17:29:40 2016 -0500 Committer: smarthi <smar...@apache.org> Committed: Sat Dec 17 17:46:39 2016 -0500 ---------------------------------------------------------------------- .../http/provider/SimpleHttpProvider.java | 9 ++--- .../pagefeed/FacebookPageFeedProviderIT.java | 2 +- .../providers/GPlusUserActivityProviderIT.java | 2 +- .../InstagramRecentMediaProviderIT.java | 11 +++--- .../providers/InstagramUserInfoProviderIT.java | 16 ++++----- .../providers/TwitterFollowingProviderIT.java | 14 ++++---- .../test/providers/TwitterStreamProviderIT.java | 2 +- .../providers/TwitterTimelineProviderIT.java | 13 ++++--- .../TwitterUserInformationProviderIT.java | 9 +++-- .../youtube/provider/YoutubeProviderTest.java | 13 +++---- .../providers/YoutubeChannelProviderIT.java | 2 +- .../YoutubeUserActivityProviderIT.java | 11 +++--- ...amOnUnhandleThrowableThreadPoolExecutor.java | 4 +-- .../local/builders/LocalStreamBuilderTest.java | 7 +--- ...nhandledThrowableThreadPoolExecutorTest.java | 36 ++++++-------------- 15 files changed, 59 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java ---------------------------------------------------------------------- diff --git a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java index eb20d9c..9fa0aeb 100644 --- a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java +++ b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java @@ -193,15 +193,12 @@ public class SimpleHttpProvider implements StreamsProvider { @Override public void startStream() { - executor.execute(new Runnable() { - @Override - public void run() { + executor.execute(() -> { - readCurrent(); + readCurrent(); - Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); + Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); - } }); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java index 2c22d46..aeb01dd 100644 --- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java +++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java @@ -44,7 +44,7 @@ public class FacebookPageFeedProviderIT { args[0] = configfile; args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { + Thread testThread = new Thread(() -> { try { FacebookPageFeedProvider.main(args); } catch( Exception e ) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java index ab57ab3..71e825f 100644 --- a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java +++ b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java @@ -41,7 +41,7 @@ public class GPlusUserActivityProviderIT { args[0] = configfile; args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { + Thread testThread = new Thread(() -> { try { GPlusUserActivityProvider.main(args); } catch ( Exception ex ) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java index 660469b..8507db5 100644 --- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java +++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java @@ -22,6 +22,7 @@ import org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaPro import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.Test; import java.io.File; @@ -45,7 +46,7 @@ public class InstagramRecentMediaProviderIT { args[0] = configfile; args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { + Thread testThread = new Thread(() -> { try { InstagramRecentMediaProvider.main(args); } catch ( Exception ex ) { @@ -56,9 +57,9 @@ public class InstagramRecentMediaProviderIT { testThread.join(60000); File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + Assert.assertTrue (out.exists()); + Assert.assertTrue (out.canRead()); + Assert.assertTrue (out.isFile()); FileReader outReader = new FileReader(out); LineNumberReader outCounter = new LineNumberReader(outReader); @@ -66,6 +67,6 @@ public class InstagramRecentMediaProviderIT { while (outCounter.readLine() != null) { } - assert (outCounter.getLineNumber() >= 1); + Assert.assertTrue (outCounter.getLineNumber() >= 1); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java index f8f0ce3..ba125e7 100644 --- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java +++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java @@ -22,17 +22,13 @@ import org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.Test; import java.io.File; import java.io.FileReader; import java.io.LineNumberReader; -import static org.testng.Assert.assertTrue; - -/** - * Created by sblackmon on 10/12/16. - */ public class InstagramUserInfoProviderIT { private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoProviderIT.class); @@ -47,7 +43,7 @@ public class InstagramUserInfoProviderIT { args[0] = configfile; args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { + Thread testThread = new Thread(() -> { try { InstagramUserInfoProvider.main(args); } catch ( Exception ex ) { @@ -58,16 +54,16 @@ public class InstagramUserInfoProviderIT { testThread.join(60000); File out = new File(outfile); - assertTrue (out.exists()); - assertTrue (out.canRead()); - assertTrue (out.isFile()); + Assert.assertTrue (out.exists()); + Assert.assertTrue (out.canRead()); + Assert.assertTrue (out.isFile()); FileReader outReader = new FileReader(out); LineNumberReader outCounter = new LineNumberReader(outReader); while (outCounter.readLine() != null) {} - assertTrue (outCounter.getLineNumber() >= 1); + Assert.assertTrue (outCounter.getLineNumber() >= 1); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java index f0f6f76..1affc74 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java @@ -22,15 +22,13 @@ import org.apache.streams.twitter.provider.TwitterFollowingProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.Test; import java.io.File; import java.io.FileReader; import java.io.LineNumberReader; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - public class TwitterFollowingProviderIT { private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderIT.class); @@ -45,7 +43,7 @@ public class TwitterFollowingProviderIT { args[0] = configfile; args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { + Thread testThread = new Thread(() -> { try { TwitterFollowingProvider.main(args); } catch ( Exception ex ) { @@ -56,16 +54,16 @@ public class TwitterFollowingProviderIT { testThread.join(60000); File out = new File(outfile); - assertTrue (out.exists()); - assertTrue (out.canRead()); - assertTrue (out.isFile()); + Assert.assertTrue (out.exists()); + Assert.assertTrue (out.canRead()); + Assert.assertTrue (out.isFile()); FileReader outReader = new FileReader(out); LineNumberReader outCounter = new LineNumberReader(outReader); while (outCounter.readLine() != null) {} - assertEquals (outCounter.getLineNumber(), 10000); + Assert.assertEquals (outCounter.getLineNumber(), 10000); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java index d0a99a9..1048de0 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java @@ -46,7 +46,7 @@ public class TwitterStreamProviderIT { args[0] = configfile; args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { + Thread testThread = new Thread(() -> { try { TwitterStreamProvider.main(args); } catch ( Exception ex ) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java index 750c7be..117cba4 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java @@ -22,14 +22,13 @@ import org.apache.streams.twitter.provider.TwitterTimelineProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.Test; import java.io.File; import java.io.FileReader; import java.io.LineNumberReader; -import static org.testng.Assert.assertEquals; - public class TwitterTimelineProviderIT { private static final Logger LOGGER = LoggerFactory.getLogger(TwitterTimelineProviderIT.class); @@ -44,7 +43,7 @@ public class TwitterTimelineProviderIT { args[0] = configfile; args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { + Thread testThread = new Thread(() -> { try { TwitterTimelineProvider.main(args); } catch ( Exception ex ) { @@ -55,16 +54,16 @@ public class TwitterTimelineProviderIT { testThread.join(60000); File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + Assert.assertTrue (out.exists()); + Assert.assertTrue (out.canRead()); + Assert.assertTrue (out.isFile()); FileReader outReader = new FileReader(out); LineNumberReader outCounter = new LineNumberReader(outReader); while (outCounter.readLine() != null) {} - assertEquals (outCounter.getLineNumber(), 1000); + Assert.assertEquals (outCounter.getLineNumber(), 1000); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java index 1748700..61c1031 100644 --- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java +++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java @@ -31,7 +31,6 @@ import java.io.LineNumberReader; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; -import static org.testng.Assert.assertTrue; public class TwitterUserInformationProviderIT { @@ -47,7 +46,7 @@ public class TwitterUserInformationProviderIT { args[0] = configfile; args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { + Thread testThread = new Thread(() -> { try { TwitterUserInformationProvider.main(args); } catch ( Exception ex ) { @@ -58,9 +57,9 @@ public class TwitterUserInformationProviderIT { testThread.join(60000); File out = new File(outfile); - assertTrue(out.exists()); - assertTrue(out.canRead()); - assertTrue(out.isFile()); + Assert.assertTrue (out.exists()); + Assert.assertTrue (out.canRead()); + Assert.assertTrue (out.isFile()); FileReader outReader = new FileReader(out); LineNumberReader outCounter = new LineNumberReader(outReader); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java index 54c0375..121c3d5 100644 --- a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java +++ b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java @@ -153,14 +153,11 @@ public class YoutubeProviderTest { @Override protected Runnable getDataCollector(BackOffStrategy strategy, BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) { final BlockingQueue<StreamsDatum> q = queue; - return new Runnable() { - @Override - public void run() { - try { - q.put(new StreamsDatum(null)); - } catch (InterruptedException ie) { - fail("Test was interrupted"); - } + return () -> { + try { + q.put(new StreamsDatum(null)); + } catch (InterruptedException ie) { + fail("Test was interrupted"); } }; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java index 0edbc37..f551d93 100644 --- a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java +++ b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java @@ -46,7 +46,7 @@ public class YoutubeChannelProviderIT { args[0] = configfile; args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { + Thread testThread = new Thread(() -> { try { YoutubeChannelProvider.main(args); } catch ( Exception ex ) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java index d97a119..34442a2 100644 --- a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java +++ b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java @@ -22,6 +22,7 @@ import com.youtube.provider.YoutubeUserActivityProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.Test; import java.io.File; @@ -45,7 +46,7 @@ public class YoutubeUserActivityProviderIT { args[0] = configfile; args[1] = outfile; - Thread testThread = new Thread((Runnable) () -> { + Thread testThread = new Thread(() -> { try { YoutubeUserActivityProvider.main(args); } catch ( Exception ex ) { @@ -56,16 +57,16 @@ public class YoutubeUserActivityProviderIT { testThread.join(60000); File out = new File(outfile); - assert (out.exists()); - assert (out.canRead()); - assert (out.isFile()); + Assert.assertTrue (out.exists()); + Assert.assertTrue (out.canRead()); + Assert.assertTrue (out.isFile()); FileReader outReader = new FileReader(out); LineNumberReader outCounter = new LineNumberReader(outReader); while (outCounter.readLine() != null) {} - assert (outCounter.getLineNumber() >= 250); + Assert.assertTrue(outCounter.getLineNumber() >= 250); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java index 38bda24..64dd72d 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java @@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit; /** * A fixed ThreadPoolExecutor that will shutdown a stream upon a thread ending execution due to an unhandled throwable. - * @see {@link java.util.concurrent.ThreadPoolExecutor} + * @see java.util.concurrent.ThreadPoolExecutor */ public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor { @@ -44,7 +44,7 @@ public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadP * @param streamBuilder streambuilder to call {@link org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled throwable */ public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, LocalStreamBuilder streamBuilder) { - super(numThreads, numThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + super(numThreads, numThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); this.streamBuilder = streamBuilder; this.isStoped = false; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java index d37209c..06e2762 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java @@ -334,12 +334,7 @@ public class LocalStreamBuilderTest extends RandomizedTest { builder.newPerpetualStream("prov1", new NumericMessageProvider(30)) .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1") .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1"); - service.submit(new Runnable() { - @Override - public void run() { - builder.start(); - } - }); + service.submit(builder::start); //Let streams spin up threads and start to process Thread.sleep(500); builder.stop(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java index e3b608d..0048153 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java @@ -23,8 +23,6 @@ import org.apache.streams.util.ComponentUtils; import org.junit.After; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -55,22 +53,16 @@ public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest { public void testShutDownOnException() { LocalStreamBuilder sb = mock(LocalStreamBuilder.class); final AtomicBoolean isShutdown = new AtomicBoolean(false); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - isShutdown.set(true); - return null; - } + doAnswer(invocationOnMock -> { + isShutdown.set(true); + return null; }).when(sb).stop(); final CountDownLatch latch = new CountDownLatch(1); - Runnable runnable = new Runnable() { - @Override - public void run() { - latch.countDown(); - throw new RuntimeException("Testing Throwable Handling!"); - } + Runnable runnable = () -> { + latch.countDown(); + throw new RuntimeException("Testing Throwable Handling!"); }; ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb); @@ -94,22 +86,14 @@ public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest { public void testNormalExecution() { LocalStreamBuilder sb = mock(LocalStreamBuilder.class); final AtomicBoolean isShutdown = new AtomicBoolean(false); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - isShutdown.set(true); - return null; - } + doAnswer(invocationOnMock -> { + isShutdown.set(true); + return null; }).when(sb).stop(); final CountDownLatch latch = new CountDownLatch(1); - Runnable runnable = new Runnable() { - @Override - public void run() { - latch.countDown(); - } - }; + Runnable runnable = latch::countDown; ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb); executor.execute(runnable);