Repository: ignite Updated Branches: refs/heads/ignite-1790 748e88bda -> d467e26d2
IGNITE-1790 Camel streamer: add 1 unit test and cleanup code. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d467e26d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d467e26d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d467e26d Branch: refs/heads/ignite-1790 Commit: d467e26d29635e76df0ad0f237b23a2b65bdfef5 Parents: 748e88b Author: Raul Kripalani <[email protected]> Authored: Wed Oct 28 11:23:07 2015 +0000 Committer: Raul Kripalani <[email protected]> Committed: Wed Oct 28 11:23:07 2015 +0000 ---------------------------------------------------------------------- .../ignite/stream/camel/CamelStreamer.java | 2 +- .../stream/camel/IgniteCamelStreamerTest.java | 100 +++++++++++++------ .../camel/IgniteCamelStreamerTestSuite.java | 2 +- .../src/test/resources/camel.test.properties | 1 + 4 files changed, 75 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d467e26d/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java ---------------------------------------------------------------------- diff --git a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java index 6bca964..38b16f3 100644 --- a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java +++ b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java @@ -39,7 +39,7 @@ import org.apache.ignite.stream.StreamSingleTupleExtractor; /** * This streamer consumes messages from an Apache Camel consumer endpoint and feeds them into an Ignite data streamer. * - * The only mandatory property is {@link #endpointUri} and the appropriate stream tuple extractor + * The only mandatory properties are {@link #endpointUri} and the appropriate stream tuple extractor * (either {@link StreamSingleTupleExtractor} or {@link StreamMultipleTupleExtractor)}. * * The user can also provide a custom {@link CamelContext} in case they want to attach custom components, a http://git-wip-us.apache.org/repos/asf/ignite/blob/d467e26d/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java index 8786b61..84d5154 100644 --- a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java +++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java @@ -33,6 +33,7 @@ import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.component.properties.PropertiesComponent; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.support.LifecycleStrategySupport; import org.apache.ignite.Ignite; @@ -86,6 +87,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest { /** The OkHttpClient. */ private OkHttpClient httpClient = new OkHttpClient(); + // Initialize the test data. static { for (int i = 0; i < 100; i++) TEST_DATA.put(i, "v" + i); @@ -103,6 +105,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest { // find an available local port try (ServerSocket ss = new ServerSocket(0)) { int port = ss.getLocalPort(); + url = "http://localhost:" + port + "/ignite"; } @@ -227,12 +230,49 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest { } /** - * Creates a Camel streamer. - * * @throws Exception */ + public void testUserSpecifiedCamelContextWithPropertyPlaceholders() throws Exception { + // Create a CamelContext with a custom property placeholder. + CamelContext context = new DefaultCamelContext(); + + PropertiesComponent pc = new PropertiesComponent("camel.test.properties"); + + context.addComponent("properties", pc); + + // Replace the context path in the test URL with the property placeholder. + url = url.replaceAll("/ignite", "{{test.contextPath}}"); + + // Recreate the Camel streamer with the new URL. + streamer = createCamelStreamer(dataStreamer); + + streamer.setSingleTupleExtractor(singleTupleExtractor()); + streamer.setCamelContext(context); + + // Subscribe to cache PUT events. + CountDownLatch latch = subscribeToPutEvents(50); + + // Action time. + streamer.start(); + + // Before sending the messages, get the actual URL after the property placeholder was resolved, + // stripping the jetty: prefix from it. + url = streamer.getCamelContext().getEndpoints().iterator().next().getEndpointUri().replaceAll("jetty:", ""); + + // Send messages. + sendMessages(0, 50, false); + + // Assertions. + assertTrue(latch.await(10, TimeUnit.SECONDS)); + assertCacheEntriesLoaded(50); + } + + /** + * Creates a Camel streamer. + */ private CamelStreamer<Integer, String> createCamelStreamer(IgniteDataStreamer<Integer, String> dataStreamer) { CamelStreamer<Integer, String> streamer = new CamelStreamer<>(); + streamer.setIgnite(grid()); streamer.setStreamer(dataStreamer); streamer.setEndpointUri("jetty:" + url); @@ -244,21 +284,7 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest { } /** - * Returns a {@link StreamSingleTupleExtractor} for testing. - * - * @throws Exception - */ - public static StreamSingleTupleExtractor<Exchange, Integer, String> singleTupleExtractor() { - return new StreamSingleTupleExtractor<Exchange, Integer, String>() { - @Override public Map.Entry<Integer, String> extract(Exchange exchange) { - List<String> s = Splitter.on(",").splitToList(exchange.getIn().getBody(String.class)); - return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1)); - } - }; - } - - /** - * @throws Exception + * @throws IOException * @return HTTP response payloads. */ private List<String> sendMessages(int fromIdx, int cnt, boolean singleMessage) throws IOException { @@ -298,60 +324,78 @@ public class IgniteCamelStreamerTest extends GridCommonAbstractTest { } /** + * Returns a {@link StreamSingleTupleExtractor} for testing. + */ + private static StreamSingleTupleExtractor<Exchange, Integer, String> singleTupleExtractor() { + return new StreamSingleTupleExtractor<Exchange, Integer, String>() { + @Override public Map.Entry<Integer, String> extract(Exchange exchange) { + List<String> s = Splitter.on(",").splitToList(exchange.getIn().getBody(String.class)); + + return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1)); + } + }; + } + + /** * Returns a {@link StreamMultipleTupleExtractor} for testing. - * - * @throws Exception */ - public static StreamMultipleTupleExtractor<Exchange, Integer, String> multipleTupleExtractor() { + private static StreamMultipleTupleExtractor<Exchange, Integer, String> multipleTupleExtractor() { return new StreamMultipleTupleExtractor<Exchange, Integer, String>() { @Override public Map<Integer, String> extract(Exchange exchange) { final Map<String, String> map = Splitter.on("\n") .omitEmptyStrings() .withKeyValueSeparator(",") - .split(new String(exchange.getIn().getBody(String.class))); + .split(exchange.getIn().getBody(String.class)); + final Map<Integer, String> answer = new HashMap<>(); + F.forEach(map.keySet(), new IgniteInClosure<String>() { @Override public void apply(String s) { answer.put(Integer.parseInt(s), map.get(s)); } }); + return answer; } }; } /** - * @throws Exception + * Subscribe to cache put events. */ private CountDownLatch subscribeToPutEvents(int expect) { Ignite ignite = grid(); // Listen to cache PUT events and expect as many as messages as test data items final CountDownLatch latch = new CountDownLatch(expect); - @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() { + @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = + new IgniteBiPredicate<UUID, CacheEvent>() { @Override public boolean apply(UUID uuid, CacheEvent evt) { latch.countDown(); + return true; } }; - remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(callback, null, EVT_CACHE_OBJECT_PUT); + remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)) + .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT); + return latch; } /** - * @throws Exception + * Assert a given number of cache entries have been loaded. */ - private void assertCacheEntriesLoaded(int count) { + private void assertCacheEntriesLoaded(int cnt) { // get the cache and check that the entries are present IgniteCache<Integer, String> cache = grid().cache(null); // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache - for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) + for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, cnt)) assertEquals(TEST_DATA.get(key), cache.get(key)); // assert that the cache exactly the specified amount of elements - assertEquals(count, cache.size(CachePeekMode.ALL)); + assertEquals(cnt, cache.size(CachePeekMode.ALL)); // remove the event listener grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener); http://git-wip-us.apache.org/repos/asf/ignite/blob/d467e26d/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java index 26aaab1..56870fb 100644 --- a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java +++ b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java @@ -29,4 +29,4 @@ import org.junit.runners.Suite; }) public class IgniteCamelStreamerTestSuite { -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d467e26d/modules/camel/src/test/resources/camel.test.properties ---------------------------------------------------------------------- diff --git a/modules/camel/src/test/resources/camel.test.properties b/modules/camel/src/test/resources/camel.test.properties new file mode 100644 index 0000000..ca9969a --- /dev/null +++ b/modules/camel/src/test/resources/camel.test.properties @@ -0,0 +1 @@ +test.contextPath = /ignite-properties
