This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d096931cecf9f83e414cc48e2dc140334e79a705
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Wed Jul 31 08:56:44 2019 +0200

    CAMEL-13801 - Fixed CS for Camel-RXJava2
---
 .../rxjava2/engine/RxJavaStreamsServiceTest.java   | 131 ++++++---------------
 1 file changed, 35 insertions(+), 96 deletions(-)

diff --git 
a/components/camel-rxjava2/src/test/java/org/apache/camel/component/rxjava2/engine/RxJavaStreamsServiceTest.java
 
b/components/camel-rxjava2/src/test/java/org/apache/camel/component/rxjava2/engine/RxJavaStreamsServiceTest.java
index 567ab8a..463d831 100644
--- 
a/components/camel-rxjava2/src/test/java/org/apache/camel/component/rxjava2/engine/RxJavaStreamsServiceTest.java
+++ 
b/components/camel-rxjava2/src/test/java/org/apache/camel/component/rxjava2/engine/RxJavaStreamsServiceTest.java
@@ -38,9 +38,9 @@ import org.junit.Test;
 import org.reactivestreams.Publisher;
 
 public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport {
-       
-       @BindToRegistry("hello")
-       private SampleBean bean = new SampleBean();
+
+    @BindToRegistry("hello")
+    private SampleBean bean = new SampleBean();
 
     public static class SampleBean {
         public String hello(String name) {
@@ -56,8 +56,7 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
     public void testFromStreamDirect() throws Exception {
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("direct:reactive")
-                    .to("reactive-streams:numbers");
+                from("direct:reactive").to("reactive-streams:numbers");
             }
         });
 
@@ -66,9 +65,7 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
 
         AtomicInteger value = new AtomicInteger(0);
 
-        Flowable.fromPublisher(crs.fromStream("numbers", Integer.class))
-            .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), 
res.intValue()))
-            .subscribe();
+        Flowable.fromPublisher(crs.fromStream("numbers", 
Integer.class)).doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), 
res.intValue())).subscribe();
 
         template.sendBody("direct:reactive", 1);
         template.sendBody("direct:reactive", 2);
@@ -80,10 +77,7 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("timer:tick?period=5&repeatCount=30")
-                    .setBody()
-                        .header(Exchange.TIMER_COUNTER)
-                    .to("reactive-streams:tick");
+                
from("timer:tick?period=5&repeatCount=30").setBody().header(Exchange.TIMER_COUNTER).to("reactive-streams:tick");
             }
         });
 
@@ -91,9 +85,7 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         final CountDownLatch latch = new CountDownLatch(num);
         final AtomicInteger value = new AtomicInteger(0);
 
-        Flowable.fromPublisher(crs.fromStream("tick", Integer.class))
-            .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), 
res.intValue()))
-            .doOnNext(n -> latch.countDown())
+        Flowable.fromPublisher(crs.fromStream("tick", 
Integer.class)).doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), 
res.intValue())).doOnNext(n -> latch.countDown())
             .subscribe();
 
         context.start();
@@ -109,20 +101,15 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:reactive")
-                    .to("reactive-streams:direct");
+                from("direct:reactive").to("reactive-streams:direct");
             }
         });
 
         CountDownLatch latch1 = new CountDownLatch(2);
-        Flowable.fromPublisher(crs.fromStream("direct", Integer.class))
-            .doOnNext(res -> latch1.countDown())
-            .subscribe();
+        Flowable.fromPublisher(crs.fromStream("direct", 
Integer.class)).doOnNext(res -> latch1.countDown()).subscribe();
 
         CountDownLatch latch2 = new CountDownLatch(2);
-        Flowable.fromPublisher(crs.fromStream("direct", Integer.class))
-            .doOnNext(res -> latch2.countDown())
-            .subscribe();
+        Flowable.fromPublisher(crs.fromStream("direct", 
Integer.class)).doOnNext(res -> latch2.countDown()).subscribe();
 
         template.sendBody("direct:reactive", 1);
         template.sendBody("direct:reactive", 2);
@@ -136,9 +123,7 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("timer:tick?period=50")
-                    .setBody().header(Exchange.TIMER_COUNTER)
-                    .to("reactive-streams:tick");
+                
from("timer:tick?period=50").setBody().header(Exchange.TIMER_COUNTER).to("reactive-streams:tick");
             }
         });
 
@@ -178,11 +163,8 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         AtomicInteger value = new AtomicInteger(0);
         CountDownLatch latch = new CountDownLatch(3);
 
-        Flowable.fromPublisher(timer)
-            .map(exchange -> ExchangeHelper.getHeaderOrProperty(exchange, 
Exchange.TIMER_COUNTER, Integer.class))
-            .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), 
res.intValue()))
-            .doOnNext(res -> latch.countDown())
-            .subscribe();
+        Flowable.fromPublisher(timer).map(exchange -> 
ExchangeHelper.getHeaderOrProperty(exchange, Exchange.TIMER_COUNTER, 
Integer.class))
+            .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), 
res.intValue())).doOnNext(res -> latch.countDown()).subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
     }
@@ -197,30 +179,19 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:source")
-                    .to("direct:stream")
-                    .setBody()
-                        .simple("after stream: ${body}");
+                
from("direct:source").to("direct:stream").setBody().simple("after stream: 
${body}");
             }
         });
 
-        crs.process("direct:stream",
-            publisher ->
-                Flowable.fromPublisher(publisher)
-                    .map(e -> {
-                        int i = e.getIn().getBody(Integer.class);
-                        e.getOut().setBody(-i);
+        crs.process("direct:stream", publisher -> 
Flowable.fromPublisher(publisher).map(e -> {
+            int i = e.getIn().getBody(Integer.class);
+            e.getOut().setBody(-i);
 
-                        return e;
-                    }
-                )
-        );
+            return e;
+        }));
 
         for (int i = 1; i <= 3; i++) {
-            Assert.assertEquals(
-                "after stream: " + (-i),
-                template.requestBody("direct:source", i, String.class)
-            );
+            Assert.assertEquals("after stream: " + (-i), 
template.requestBody("direct:source", i, String.class));
         }
     }
 
@@ -230,24 +201,14 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:source")
-                    .to("direct:stream")
-                    .setBody()
-                        .simple("after stream: ${body}");
+                
from("direct:source").to("direct:stream").setBody().simple("after stream: 
${body}");
             }
         });
 
-        crs.process("direct:stream",
-            Integer.class,
-            publisher ->
-                Flowable.fromPublisher(publisher).map(Math::negateExact)
-        );
+        crs.process("direct:stream", Integer.class, publisher -> 
Flowable.fromPublisher(publisher).map(Math::negateExact));
 
         for (int i = 1; i <= 3; i++) {
-            Assert.assertEquals(
-                "after stream: " + (-i),
-                template.requestBody("direct:source", i, String.class)
-            );
+            Assert.assertEquals("after stream: " + (-i), 
template.requestBody("direct:source", i, String.class));
         }
     }
 
@@ -259,8 +220,7 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
     public void testToStream() throws Exception {
         context.addRoutes(new RouteBuilder() {
             public void configure() {
-                from("reactive-streams:reactive")
-                    .setBody().constant("123");
+                from("reactive-streams:reactive").setBody().constant("123");
             }
         });
 
@@ -284,11 +244,8 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         AtomicInteger value = new AtomicInteger(0);
         CountDownLatch latch = new CountDownLatch(1);
 
-        Flowable.just(1, 2, 3)
-            .flatMap(e -> crs.to("bean:hello", e, String.class))
-            .doOnNext(res -> Assert.assertEquals("Hello " + 
value.incrementAndGet(), res))
-            .doOnNext(res -> latch.countDown())
-            .subscribe();
+        Flowable.just(1, 2, 3).flatMap(e -> crs.to("bean:hello", e, 
String.class)).doOnNext(res -> Assert.assertEquals("Hello " + 
value.incrementAndGet(), res))
+            .doOnNext(res -> latch.countDown()).subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
     }
@@ -300,13 +257,8 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         AtomicInteger value = new AtomicInteger(0);
         CountDownLatch latch = new CountDownLatch(1);
 
-        Flowable.just(1, 2, 3)
-            .flatMap(e -> crs.to("bean:hello", e))
-            .map(e -> e.getOut())
-            .map(e -> e.getBody(String.class))
-            .doOnNext(res -> Assert.assertEquals("Hello " + 
value.incrementAndGet(), res))
-            .doOnNext(res -> latch.countDown())
-            .subscribe();
+        Flowable.just(1, 2, 3).flatMap(e -> crs.to("bean:hello", e)).map(e -> 
e.getOut()).map(e -> e.getBody(String.class))
+            .doOnNext(res -> Assert.assertEquals("Hello " + 
value.incrementAndGet(), res)).doOnNext(res -> latch.countDown()).subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
     }
@@ -319,11 +271,7 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         CountDownLatch latch = new CountDownLatch(1);
         Function<Object, Publisher<String>> fun = crs.to("bean:hello", 
String.class);
 
-        Flowable.just(1, 2, 3)
-            .flatMap(fun::apply)
-            .doOnNext(res -> Assert.assertEquals("Hello " + 
value.incrementAndGet(), res))
-            .doOnNext(res -> latch.countDown())
-            .subscribe();
+        Flowable.just(1, 2, 3).flatMap(fun::apply).doOnNext(res -> 
Assert.assertEquals("Hello " + value.incrementAndGet(), res)).doOnNext(res -> 
latch.countDown()).subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
     }
@@ -336,13 +284,8 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         CountDownLatch latch = new CountDownLatch(1);
         Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello");
 
-        Flowable.just(1, 2, 3)
-            .flatMap(fun::apply)
-            .map(e -> e.getOut())
-            .map(e -> e.getBody(String.class))
-            .doOnNext(res -> Assert.assertEquals("Hello " + 
value.incrementAndGet(), res))
-            .doOnNext(res -> latch.countDown())
-            .subscribe();
+        Flowable.just(1, 2, 3).flatMap(fun::apply).map(e -> e.getOut()).map(e 
-> e.getBody(String.class))
+            .doOnNext(res -> Assert.assertEquals("Hello " + 
value.incrementAndGet(), res)).doOnNext(res -> latch.countDown()).subscribe();
 
         Assert.assertTrue(latch.await(2, TimeUnit.SECONDS));
     }
@@ -357,13 +300,11 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:reactor")
-                    .to("mock:result");
+                from("direct:reactor").to("mock:result");
             }
         });
 
-        Flowable.just(1, 2, 3)
-            .subscribe(crs.subscriber("direct:reactor", Integer.class));
+        Flowable.just(1, 2, 3).subscribe(crs.subscriber("direct:reactor", 
Integer.class));
 
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(3);
@@ -384,10 +325,8 @@ public class RxJavaStreamsServiceTest extends 
RxJavaStreamsServiceTestSupport {
         context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:one")
-                    .to("reactive-streams:stream");
-                from("direct:two")
-                    .to("reactive-streams:stream");
+                from("direct:one").to("reactive-streams:stream");
+                from("direct:two").to("reactive-streams:stream");
             }
         });
 

Reply via email to