This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit d096931cecf9f83e414cc48e2dc140334e79a705 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Wed Jul 31 08:56:44 2019 +0200 CAMEL-13801 - Fixed CS for Camel-RXJava2 --- .../rxjava2/engine/RxJavaStreamsServiceTest.java | 131 ++++++--------------- 1 file changed, 35 insertions(+), 96 deletions(-) diff --git a/components/camel-rxjava2/src/test/java/org/apache/camel/component/rxjava2/engine/RxJavaStreamsServiceTest.java b/components/camel-rxjava2/src/test/java/org/apache/camel/component/rxjava2/engine/RxJavaStreamsServiceTest.java index 567ab8a..463d831 100644 --- a/components/camel-rxjava2/src/test/java/org/apache/camel/component/rxjava2/engine/RxJavaStreamsServiceTest.java +++ b/components/camel-rxjava2/src/test/java/org/apache/camel/component/rxjava2/engine/RxJavaStreamsServiceTest.java @@ -38,9 +38,9 @@ import org.junit.Test; import org.reactivestreams.Publisher; public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { - - @BindToRegistry("hello") - private SampleBean bean = new SampleBean(); + + @BindToRegistry("hello") + private SampleBean bean = new SampleBean(); public static class SampleBean { public String hello(String name) { @@ -56,8 +56,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { public void testFromStreamDirect() throws Exception { context.addRoutes(new RouteBuilder() { public void configure() { - from("direct:reactive") - .to("reactive-streams:numbers"); + from("direct:reactive").to("reactive-streams:numbers"); } }); @@ -66,9 +65,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { AtomicInteger value = new AtomicInteger(0); - Flowable.fromPublisher(crs.fromStream("numbers", Integer.class)) - .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())) - .subscribe(); + Flowable.fromPublisher(crs.fromStream("numbers", Integer.class)).doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())).subscribe(); template.sendBody("direct:reactive", 1); template.sendBody("direct:reactive", 2); @@ -80,10 +77,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("timer:tick?period=5&repeatCount=30") - .setBody() - .header(Exchange.TIMER_COUNTER) - .to("reactive-streams:tick"); + from("timer:tick?period=5&repeatCount=30").setBody().header(Exchange.TIMER_COUNTER).to("reactive-streams:tick"); } }); @@ -91,9 +85,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { final CountDownLatch latch = new CountDownLatch(num); final AtomicInteger value = new AtomicInteger(0); - Flowable.fromPublisher(crs.fromStream("tick", Integer.class)) - .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())) - .doOnNext(n -> latch.countDown()) + Flowable.fromPublisher(crs.fromStream("tick", Integer.class)).doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())).doOnNext(n -> latch.countDown()) .subscribe(); context.start(); @@ -109,20 +101,15 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:reactive") - .to("reactive-streams:direct"); + from("direct:reactive").to("reactive-streams:direct"); } }); CountDownLatch latch1 = new CountDownLatch(2); - Flowable.fromPublisher(crs.fromStream("direct", Integer.class)) - .doOnNext(res -> latch1.countDown()) - .subscribe(); + Flowable.fromPublisher(crs.fromStream("direct", Integer.class)).doOnNext(res -> latch1.countDown()).subscribe(); CountDownLatch latch2 = new CountDownLatch(2); - Flowable.fromPublisher(crs.fromStream("direct", Integer.class)) - .doOnNext(res -> latch2.countDown()) - .subscribe(); + Flowable.fromPublisher(crs.fromStream("direct", Integer.class)).doOnNext(res -> latch2.countDown()).subscribe(); template.sendBody("direct:reactive", 1); template.sendBody("direct:reactive", 2); @@ -136,9 +123,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("timer:tick?period=50") - .setBody().header(Exchange.TIMER_COUNTER) - .to("reactive-streams:tick"); + from("timer:tick?period=50").setBody().header(Exchange.TIMER_COUNTER).to("reactive-streams:tick"); } }); @@ -178,11 +163,8 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { AtomicInteger value = new AtomicInteger(0); CountDownLatch latch = new CountDownLatch(3); - Flowable.fromPublisher(timer) - .map(exchange -> ExchangeHelper.getHeaderOrProperty(exchange, Exchange.TIMER_COUNTER, Integer.class)) - .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())) - .doOnNext(res -> latch.countDown()) - .subscribe(); + Flowable.fromPublisher(timer).map(exchange -> ExchangeHelper.getHeaderOrProperty(exchange, Exchange.TIMER_COUNTER, Integer.class)) + .doOnNext(res -> Assert.assertEquals(value.incrementAndGet(), res.intValue())).doOnNext(res -> latch.countDown()).subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); } @@ -197,30 +179,19 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:source") - .to("direct:stream") - .setBody() - .simple("after stream: ${body}"); + from("direct:source").to("direct:stream").setBody().simple("after stream: ${body}"); } }); - crs.process("direct:stream", - publisher -> - Flowable.fromPublisher(publisher) - .map(e -> { - int i = e.getIn().getBody(Integer.class); - e.getOut().setBody(-i); + crs.process("direct:stream", publisher -> Flowable.fromPublisher(publisher).map(e -> { + int i = e.getIn().getBody(Integer.class); + e.getOut().setBody(-i); - return e; - } - ) - ); + return e; + })); for (int i = 1; i <= 3; i++) { - Assert.assertEquals( - "after stream: " + (-i), - template.requestBody("direct:source", i, String.class) - ); + Assert.assertEquals("after stream: " + (-i), template.requestBody("direct:source", i, String.class)); } } @@ -230,24 +201,14 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:source") - .to("direct:stream") - .setBody() - .simple("after stream: ${body}"); + from("direct:source").to("direct:stream").setBody().simple("after stream: ${body}"); } }); - crs.process("direct:stream", - Integer.class, - publisher -> - Flowable.fromPublisher(publisher).map(Math::negateExact) - ); + crs.process("direct:stream", Integer.class, publisher -> Flowable.fromPublisher(publisher).map(Math::negateExact)); for (int i = 1; i <= 3; i++) { - Assert.assertEquals( - "after stream: " + (-i), - template.requestBody("direct:source", i, String.class) - ); + Assert.assertEquals("after stream: " + (-i), template.requestBody("direct:source", i, String.class)); } } @@ -259,8 +220,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { public void testToStream() throws Exception { context.addRoutes(new RouteBuilder() { public void configure() { - from("reactive-streams:reactive") - .setBody().constant("123"); + from("reactive-streams:reactive").setBody().constant("123"); } }); @@ -284,11 +244,8 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { AtomicInteger value = new AtomicInteger(0); CountDownLatch latch = new CountDownLatch(1); - Flowable.just(1, 2, 3) - .flatMap(e -> crs.to("bean:hello", e, String.class)) - .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)) - .doOnNext(res -> latch.countDown()) - .subscribe(); + Flowable.just(1, 2, 3).flatMap(e -> crs.to("bean:hello", e, String.class)).doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)) + .doOnNext(res -> latch.countDown()).subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); } @@ -300,13 +257,8 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { AtomicInteger value = new AtomicInteger(0); CountDownLatch latch = new CountDownLatch(1); - Flowable.just(1, 2, 3) - .flatMap(e -> crs.to("bean:hello", e)) - .map(e -> e.getOut()) - .map(e -> e.getBody(String.class)) - .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)) - .doOnNext(res -> latch.countDown()) - .subscribe(); + Flowable.just(1, 2, 3).flatMap(e -> crs.to("bean:hello", e)).map(e -> e.getOut()).map(e -> e.getBody(String.class)) + .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)).doOnNext(res -> latch.countDown()).subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); } @@ -319,11 +271,7 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { CountDownLatch latch = new CountDownLatch(1); Function<Object, Publisher<String>> fun = crs.to("bean:hello", String.class); - Flowable.just(1, 2, 3) - .flatMap(fun::apply) - .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)) - .doOnNext(res -> latch.countDown()) - .subscribe(); + Flowable.just(1, 2, 3).flatMap(fun::apply).doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)).doOnNext(res -> latch.countDown()).subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); } @@ -336,13 +284,8 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { CountDownLatch latch = new CountDownLatch(1); Function<Object, Publisher<Exchange>> fun = crs.to("bean:hello"); - Flowable.just(1, 2, 3) - .flatMap(fun::apply) - .map(e -> e.getOut()) - .map(e -> e.getBody(String.class)) - .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)) - .doOnNext(res -> latch.countDown()) - .subscribe(); + Flowable.just(1, 2, 3).flatMap(fun::apply).map(e -> e.getOut()).map(e -> e.getBody(String.class)) + .doOnNext(res -> Assert.assertEquals("Hello " + value.incrementAndGet(), res)).doOnNext(res -> latch.countDown()).subscribe(); Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); } @@ -357,13 +300,11 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:reactor") - .to("mock:result"); + from("direct:reactor").to("mock:result"); } }); - Flowable.just(1, 2, 3) - .subscribe(crs.subscriber("direct:reactor", Integer.class)); + Flowable.just(1, 2, 3).subscribe(crs.subscriber("direct:reactor", Integer.class)); MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(3); @@ -384,10 +325,8 @@ public class RxJavaStreamsServiceTest extends RxJavaStreamsServiceTestSupport { context.addRoutes(new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:one") - .to("reactive-streams:stream"); - from("direct:two") - .to("reactive-streams:stream"); + from("direct:one").to("reactive-streams:stream"); + from("direct:two").to("reactive-streams:stream"); } });