Repository: incubator-streams
Updated Branches:
  refs/heads/master 00363f213 -> 4ac71f485


Trivial Anonymous -> Java 8 lambdas, replace assert with TestNg.Assert in 
Integration tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/4ac71f48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/4ac71f48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/4ac71f48

Branch: refs/heads/master
Commit: 4ac71f4856a8172dbca0309e48da03470e4365e1
Parents: 00363f2
Author: smarthi <smar...@apache.org>
Authored: Sat Dec 17 17:29:40 2016 -0500
Committer: smarthi <smar...@apache.org>
Committed: Sat Dec 17 17:46:39 2016 -0500

----------------------------------------------------------------------
 .../http/provider/SimpleHttpProvider.java       |  9 ++---
 .../pagefeed/FacebookPageFeedProviderIT.java    |  2 +-
 .../providers/GPlusUserActivityProviderIT.java  |  2 +-
 .../InstagramRecentMediaProviderIT.java         | 11 +++---
 .../providers/InstagramUserInfoProviderIT.java  | 16 ++++-----
 .../providers/TwitterFollowingProviderIT.java   | 14 ++++----
 .../test/providers/TwitterStreamProviderIT.java |  2 +-
 .../providers/TwitterTimelineProviderIT.java    | 13 ++++---
 .../TwitterUserInformationProviderIT.java       |  9 +++--
 .../youtube/provider/YoutubeProviderTest.java   | 13 +++----
 .../providers/YoutubeChannelProviderIT.java     |  2 +-
 .../YoutubeUserActivityProviderIT.java          | 11 +++---
 ...amOnUnhandleThrowableThreadPoolExecutor.java |  4 +--
 .../local/builders/LocalStreamBuilderTest.java  |  7 +---
 ...nhandledThrowableThreadPoolExecutorTest.java | 36 ++++++--------------
 15 files changed, 59 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
index eb20d9c..9fa0aeb 100644
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
@@ -193,15 +193,12 @@ public class SimpleHttpProvider implements 
StreamsProvider {
   @Override
   public void startStream() {
 
-    executor.execute(new Runnable() {
-      @Override
-      public void run() {
+    executor.execute(() -> {
 
-        readCurrent();
+      readCurrent();
 
-        Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+      Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
 
-      }
     });
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java
 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java
index 2c22d46..aeb01dd 100644
--- 
a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java
+++ 
b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/providers/pagefeed/FacebookPageFeedProviderIT.java
@@ -44,7 +44,7 @@ public class FacebookPageFeedProviderIT {
     args[0] = configfile;
     args[1] = outfile;
 
-    Thread testThread = new Thread((Runnable) () -> {
+    Thread testThread = new Thread(() -> {
       try {
         FacebookPageFeedProvider.main(args);
       } catch( Exception e ) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
index ab57ab3..71e825f 100644
--- 
a/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
+++ 
b/streams-contrib/streams-provider-google/google-gplus/src/test/java/org/apache/streams/gplus/test/providers/GPlusUserActivityProviderIT.java
@@ -41,7 +41,7 @@ public class GPlusUserActivityProviderIT {
     args[0] = configfile;
     args[1] = outfile;
 
-    Thread testThread = new Thread((Runnable) () -> {
+    Thread testThread = new Thread(() -> {
       try {
         GPlusUserActivityProvider.main(args);
       } catch ( Exception ex ) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java
 
b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java
index 660469b..8507db5 100644
--- 
a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java
+++ 
b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java
@@ -22,6 +22,7 @@ import 
org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaPro
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -45,7 +46,7 @@ public class InstagramRecentMediaProviderIT {
     args[0] = configfile;
     args[1] = outfile;
 
-    Thread testThread = new Thread((Runnable) () -> {
+    Thread testThread = new Thread(() -> {
       try {
         InstagramRecentMediaProvider.main(args);
       } catch ( Exception ex ) {
@@ -56,9 +57,9 @@ public class InstagramRecentMediaProviderIT {
     testThread.join(60000);
 
     File out = new File(outfile);
-    assert (out.exists());
-    assert (out.canRead());
-    assert (out.isFile());
+    Assert.assertTrue (out.exists());
+    Assert.assertTrue (out.canRead());
+    Assert.assertTrue (out.isFile());
 
     FileReader outReader = new FileReader(out);
     LineNumberReader outCounter = new LineNumberReader(outReader);
@@ -66,6 +67,6 @@ public class InstagramRecentMediaProviderIT {
     while (outCounter.readLine() != null) {
     }
 
-    assert (outCounter.getLineNumber() >= 1);
+    Assert.assertTrue (outCounter.getLineNumber() >= 1);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java
 
b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java
index f8f0ce3..ba125e7 100644
--- 
a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java
+++ 
b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java
@@ -22,17 +22,13 @@ import 
org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.io.File;
 import java.io.FileReader;
 import java.io.LineNumberReader;
 
-import static org.testng.Assert.assertTrue;
-
-/**
- * Created by sblackmon on 10/12/16.
- */
 public class InstagramUserInfoProviderIT {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InstagramUserInfoProviderIT.class);
@@ -47,7 +43,7 @@ public class InstagramUserInfoProviderIT {
     args[0] = configfile;
     args[1] = outfile;
 
-    Thread testThread = new Thread((Runnable) () -> {
+    Thread testThread = new Thread(() -> {
       try {
         InstagramUserInfoProvider.main(args);
       } catch ( Exception ex ) {
@@ -58,16 +54,16 @@ public class InstagramUserInfoProviderIT {
     testThread.join(60000);
 
     File out = new File(outfile);
-    assertTrue (out.exists());
-    assertTrue (out.canRead());
-    assertTrue (out.isFile());
+    Assert.assertTrue (out.exists());
+    Assert.assertTrue (out.canRead());
+    Assert.assertTrue (out.isFile());
 
     FileReader outReader = new FileReader(out);
     LineNumberReader outCounter = new LineNumberReader(outReader);
 
     while (outCounter.readLine() != null) {}
 
-    assertTrue (outCounter.getLineNumber() >= 1);
+    Assert.assertTrue (outCounter.getLineNumber() >= 1);
 
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java
 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java
index f0f6f76..1affc74 100644
--- 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java
+++ 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterFollowingProviderIT.java
@@ -22,15 +22,13 @@ import 
org.apache.streams.twitter.provider.TwitterFollowingProvider;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.io.File;
 import java.io.FileReader;
 import java.io.LineNumberReader;
 
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
 public class TwitterFollowingProviderIT {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterFollowingProviderIT.class);
@@ -45,7 +43,7 @@ public class TwitterFollowingProviderIT {
     args[0] = configfile;
     args[1] = outfile;
 
-    Thread testThread = new Thread((Runnable) () -> {
+    Thread testThread = new Thread(() -> {
       try {
         TwitterFollowingProvider.main(args);
       } catch ( Exception ex ) {
@@ -56,16 +54,16 @@ public class TwitterFollowingProviderIT {
     testThread.join(60000);
 
     File out = new File(outfile);
-    assertTrue (out.exists());
-    assertTrue (out.canRead());
-    assertTrue (out.isFile());
+    Assert.assertTrue (out.exists());
+    Assert.assertTrue (out.canRead());
+    Assert.assertTrue (out.isFile());
 
     FileReader outReader = new FileReader(out);
     LineNumberReader outCounter = new LineNumberReader(outReader);
 
     while (outCounter.readLine() != null) {}
 
-    assertEquals (outCounter.getLineNumber(), 10000);
+    Assert.assertEquals (outCounter.getLineNumber(), 10000);
 
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java
 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java
index d0a99a9..1048de0 100644
--- 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java
+++ 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterStreamProviderIT.java
@@ -46,7 +46,7 @@ public class TwitterStreamProviderIT {
     args[0] = configfile;
     args[1] = outfile;
 
-    Thread testThread = new Thread((Runnable) () -> {
+    Thread testThread = new Thread(() -> {
       try {
         TwitterStreamProvider.main(args);
       } catch ( Exception ex ) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
index 750c7be..117cba4 100644
--- 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
+++ 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
@@ -22,14 +22,13 @@ import 
org.apache.streams.twitter.provider.TwitterTimelineProvider;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.io.File;
 import java.io.FileReader;
 import java.io.LineNumberReader;
 
-import static org.testng.Assert.assertEquals;
-
 public class TwitterTimelineProviderIT {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TwitterTimelineProviderIT.class);
@@ -44,7 +43,7 @@ public class TwitterTimelineProviderIT {
     args[0] = configfile;
     args[1] = outfile;
 
-    Thread testThread = new Thread((Runnable) () -> {
+    Thread testThread = new Thread(() -> {
       try {
         TwitterTimelineProvider.main(args);
       } catch ( Exception ex ) {
@@ -55,16 +54,16 @@ public class TwitterTimelineProviderIT {
     testThread.join(60000);
 
     File out = new File(outfile);
-    assert (out.exists());
-    assert (out.canRead());
-    assert (out.isFile());
+    Assert.assertTrue (out.exists());
+    Assert.assertTrue (out.canRead());
+    Assert.assertTrue (out.isFile());
 
     FileReader outReader = new FileReader(out);
     LineNumberReader outCounter = new LineNumberReader(outReader);
 
     while (outCounter.readLine() != null) {}
 
-    assertEquals (outCounter.getLineNumber(), 1000);
+    Assert.assertEquals (outCounter.getLineNumber(), 1000);
 
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java
 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java
index 1748700..61c1031 100644
--- 
a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java
+++ 
b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterUserInformationProviderIT.java
@@ -31,7 +31,6 @@ import java.io.LineNumberReader;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
-import static org.testng.Assert.assertTrue;
 
 public class TwitterUserInformationProviderIT {
 
@@ -47,7 +46,7 @@ public class TwitterUserInformationProviderIT {
     args[0] = configfile;
     args[1] = outfile;
 
-    Thread testThread = new Thread((Runnable) () -> {
+    Thread testThread = new Thread(() -> {
       try {
         TwitterUserInformationProvider.main(args);
       } catch ( Exception ex ) {
@@ -58,9 +57,9 @@ public class TwitterUserInformationProviderIT {
     testThread.join(60000);
 
     File out = new File(outfile);
-    assertTrue(out.exists());
-    assertTrue(out.canRead());
-    assertTrue(out.isFile());
+    Assert.assertTrue (out.exists());
+    Assert.assertTrue (out.canRead());
+    Assert.assertTrue (out.isFile());
 
     FileReader outReader = new FileReader(out);
     LineNumberReader outCounter = new LineNumberReader(outReader);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
index 54c0375..121c3d5 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/com/youtube/provider/YoutubeProviderTest.java
@@ -153,14 +153,11 @@ public class YoutubeProviderTest {
       @Override
       protected Runnable getDataCollector(BackOffStrategy strategy, 
BlockingQueue<StreamsDatum> queue, YouTube youtube, UserInfo userInfo) {
         final BlockingQueue<StreamsDatum> q = queue;
-        return new Runnable() {
-          @Override
-          public void run() {
-            try {
-              q.put(new StreamsDatum(null));
-            } catch (InterruptedException ie) {
-              fail("Test was interrupted");
-            }
+        return () -> {
+          try {
+            q.put(new StreamsDatum(null));
+          } catch (InterruptedException ie) {
+            fail("Test was interrupted");
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
index 0edbc37..f551d93 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeChannelProviderIT.java
@@ -46,7 +46,7 @@ public class YoutubeChannelProviderIT {
     args[0] = configfile;
     args[1] = outfile;
 
-    Thread testThread = new Thread((Runnable) () -> {
+    Thread testThread = new Thread(() -> {
       try {
         YoutubeChannelProvider.main(args);
       } catch ( Exception ex ) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
index d97a119..34442a2 100644
--- 
a/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
+++ 
b/streams-contrib/streams-provider-youtube/src/test/java/org/apache/streams/youtube/test/providers/YoutubeUserActivityProviderIT.java
@@ -22,6 +22,7 @@ import com.youtube.provider.YoutubeUserActivityProvider;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import java.io.File;
@@ -45,7 +46,7 @@ public class YoutubeUserActivityProviderIT {
     args[0] = configfile;
     args[1] = outfile;
 
-    Thread testThread = new Thread((Runnable) () -> {
+    Thread testThread = new Thread(() -> {
       try {
         YoutubeUserActivityProvider.main(args);
       } catch ( Exception ex ) {
@@ -56,16 +57,16 @@ public class YoutubeUserActivityProviderIT {
     testThread.join(60000);
 
     File out = new File(outfile);
-    assert (out.exists());
-    assert (out.canRead());
-    assert (out.isFile());
+    Assert.assertTrue (out.exists());
+    Assert.assertTrue (out.canRead());
+    Assert.assertTrue (out.isFile());
 
     FileReader outReader = new FileReader(out);
     LineNumberReader outCounter = new LineNumberReader(outReader);
 
     while (outCounter.readLine() != null) {}
 
-    assert (outCounter.getLineNumber() >= 250);
+    Assert.assertTrue(outCounter.getLineNumber() >= 250);
 
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
index 38bda24..64dd72d 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * A fixed ThreadPoolExecutor that will shutdown a stream upon a thread ending 
execution due to an unhandled throwable.
- * @see {@link java.util.concurrent.ThreadPoolExecutor}
+ * @see java.util.concurrent.ThreadPoolExecutor
  */
 public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends 
ThreadPoolExecutor {
 
@@ -44,7 +44,7 @@ public class 
ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadP
    * @param streamBuilder streambuilder to call {@link 
org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled 
throwable
    */
   public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, 
LocalStreamBuilder streamBuilder) {
-    super(numThreads, numThreads, 1, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>());
+    super(numThreads, numThreads, 1, TimeUnit.SECONDS, new 
LinkedBlockingQueue<>());
     this.streamBuilder = streamBuilder;
     this.isStoped = false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index d37209c..06e2762 100644
--- 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -334,12 +334,7 @@ public class LocalStreamBuilderTest extends RandomizedTest 
{
       builder.newPerpetualStream("prov1", new NumericMessageProvider(30))
           .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
           .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1");
-      service.submit(new Runnable() {
-        @Override
-        public void run() {
-          builder.start();
-        }
-      });
+      service.submit(builder::start);
       //Let streams spin up threads and start to process
       Thread.sleep(500);
       builder.stop();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/4ac71f48/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
index e3b608d..0048153 100644
--- 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java
@@ -23,8 +23,6 @@ import org.apache.streams.util.ComponentUtils;
 
 import org.junit.After;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -55,22 +53,16 @@ public class 
ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest {
   public void testShutDownOnException() {
     LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
     final AtomicBoolean isShutdown = new AtomicBoolean(false);
-    doAnswer(new Answer() {
-      @Override
-      public Object answer(InvocationOnMock invocationOnMock) throws Throwable 
{
-        isShutdown.set(true);
-        return null;
-      }
+    doAnswer(invocationOnMock -> {
+      isShutdown.set(true);
+      return null;
     }).when(sb).stop();
 
     final CountDownLatch latch = new CountDownLatch(1);
 
-    Runnable runnable = new Runnable() {
-      @Override
-      public void run() {
-        latch.countDown();
-        throw new RuntimeException("Testing Throwable Handling!");
-      }
+    Runnable runnable = () -> {
+      latch.countDown();
+      throw new RuntimeException("Testing Throwable Handling!");
     };
 
     ExecutorService executor = new 
ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
@@ -94,22 +86,14 @@ public class 
ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest {
   public void testNormalExecution() {
     LocalStreamBuilder sb = mock(LocalStreamBuilder.class);
     final AtomicBoolean isShutdown = new AtomicBoolean(false);
-    doAnswer(new Answer() {
-      @Override
-      public Object answer(InvocationOnMock invocationOnMock) throws Throwable 
{
-        isShutdown.set(true);
-        return null;
-      }
+    doAnswer(invocationOnMock -> {
+      isShutdown.set(true);
+      return null;
     }).when(sb).stop();
 
     final CountDownLatch latch = new CountDownLatch(1);
 
-    Runnable runnable = new Runnable() {
-      @Override
-      public void run() {
-        latch.countDown();
-      }
-    };
+    Runnable runnable = latch::countDown;
 
     ExecutorService executor = new 
ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb);
     executor.execute(runnable);

Reply via email to