Repository: ignite
Updated Branches:
  refs/heads/ignite-1790 748e88bda -> d467e26d2


IGNITE-1790 Camel streamer: add 1 unit test and cleanup code.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d467e26d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d467e26d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d467e26d

Branch: refs/heads/ignite-1790
Commit: d467e26d29635e76df0ad0f237b23a2b65bdfef5
Parents: 748e88b
Author: Raul Kripalani <[email protected]>
Authored: Wed Oct 28 11:23:07 2015 +0000
Committer: Raul Kripalani <[email protected]>
Committed: Wed Oct 28 11:23:07 2015 +0000

----------------------------------------------------------------------
 .../ignite/stream/camel/CamelStreamer.java      |   2 +-
 .../stream/camel/IgniteCamelStreamerTest.java   | 100 +++++++++++++------
 .../camel/IgniteCamelStreamerTestSuite.java     |   2 +-
 .../src/test/resources/camel.test.properties    |   1 +
 4 files changed, 75 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d467e26d/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java 
b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
index 6bca964..38b16f3 100644
--- 
a/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
+++ 
b/modules/camel/src/main/java/org/apache/ignite/stream/camel/CamelStreamer.java
@@ -39,7 +39,7 @@ import org.apache.ignite.stream.StreamSingleTupleExtractor;
 /**
  * This streamer consumes messages from an Apache Camel consumer endpoint and 
feeds them into an Ignite data streamer.
  *
- * The only mandatory property is {@link #endpointUri} and the appropriate 
stream tuple extractor
+ * The only mandatory properties are {@link #endpointUri} and the appropriate 
stream tuple extractor
  * (either {@link StreamSingleTupleExtractor} or {@link 
StreamMultipleTupleExtractor)}.
  *
  * The user can also provide a custom {@link CamelContext} in case they want 
to attach custom components, a

http://git-wip-us.apache.org/repos/asf/ignite/blob/d467e26d/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
 
b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
index 8786b61..84d5154 100644
--- 
a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
+++ 
b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTest.java
@@ -33,6 +33,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.properties.PropertiesComponent;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.support.LifecycleStrategySupport;
 import org.apache.ignite.Ignite;
@@ -86,6 +87,7 @@ public class IgniteCamelStreamerTest extends 
GridCommonAbstractTest {
     /** The OkHttpClient. */
     private OkHttpClient httpClient = new OkHttpClient();
 
+    // Initialize the test data.
     static {
         for (int i = 0; i < 100; i++)
             TEST_DATA.put(i, "v" + i);
@@ -103,6 +105,7 @@ public class IgniteCamelStreamerTest extends 
GridCommonAbstractTest {
         // find an available local port
         try (ServerSocket ss = new ServerSocket(0)) {
             int port = ss.getLocalPort();
+
             url = "http://localhost:"; + port + "/ignite";
         }
 
@@ -227,12 +230,49 @@ public class IgniteCamelStreamerTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * Creates a Camel streamer.
-     *
      * @throws Exception
      */
+    public void testUserSpecifiedCamelContextWithPropertyPlaceholders() throws 
Exception {
+        // Create a CamelContext with a custom property placeholder.
+        CamelContext context = new DefaultCamelContext();
+
+        PropertiesComponent pc = new 
PropertiesComponent("camel.test.properties");
+
+        context.addComponent("properties", pc);
+
+        // Replace the context path in the test URL with the property 
placeholder.
+        url = url.replaceAll("/ignite", "{{test.contextPath}}");
+
+        // Recreate the Camel streamer with the new URL.
+        streamer = createCamelStreamer(dataStreamer);
+
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setCamelContext(context);
+
+        // Subscribe to cache PUT events.
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // Action time.
+        streamer.start();
+
+        // Before sending the messages, get the actual URL after the property 
placeholder was resolved,
+        // stripping the jetty: prefix from it.
+        url = 
streamer.getCamelContext().getEndpoints().iterator().next().getEndpointUri().replaceAll("jetty:",
 "");
+
+        // Send messages.
+        sendMessages(0, 50, false);
+
+        // Assertions.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    /**
+     * Creates a Camel streamer.
+     */
     private CamelStreamer<Integer, String> 
createCamelStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
         CamelStreamer<Integer, String> streamer = new CamelStreamer<>();
+
         streamer.setIgnite(grid());
         streamer.setStreamer(dataStreamer);
         streamer.setEndpointUri("jetty:" + url);
@@ -244,21 +284,7 @@ public class IgniteCamelStreamerTest extends 
GridCommonAbstractTest {
     }
 
     /**
-     * Returns a {@link StreamSingleTupleExtractor} for testing.
-     *
-     * @throws Exception
-     */
-    public static StreamSingleTupleExtractor<Exchange, Integer, String> 
singleTupleExtractor() {
-        return new StreamSingleTupleExtractor<Exchange, Integer, String>() {
-            @Override public Map.Entry<Integer, String> extract(Exchange 
exchange) {
-                List<String> s = 
Splitter.on(",").splitToList(exchange.getIn().getBody(String.class));
-                return new GridMapEntry<>(Integer.parseInt(s.get(0)), 
s.get(1));
-            }
-        };
-    }
-
-    /**
-     * @throws Exception
+     * @throws IOException
      * @return HTTP response payloads.
      */
     private List<String> sendMessages(int fromIdx, int cnt, boolean 
singleMessage) throws IOException {
@@ -298,60 +324,78 @@ public class IgniteCamelStreamerTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Returns a {@link StreamSingleTupleExtractor} for testing.
+     */
+    private static StreamSingleTupleExtractor<Exchange, Integer, String> 
singleTupleExtractor() {
+        return new StreamSingleTupleExtractor<Exchange, Integer, String>() {
+            @Override public Map.Entry<Integer, String> extract(Exchange 
exchange) {
+                List<String> s = 
Splitter.on(",").splitToList(exchange.getIn().getBody(String.class));
+
+                return new GridMapEntry<>(Integer.parseInt(s.get(0)), 
s.get(1));
+            }
+        };
+    }
+
+    /**
      * Returns a {@link StreamMultipleTupleExtractor} for testing.
-     *
-     * @throws Exception
      */
-    public static StreamMultipleTupleExtractor<Exchange, Integer, String> 
multipleTupleExtractor() {
+    private static StreamMultipleTupleExtractor<Exchange, Integer, String> 
multipleTupleExtractor() {
         return new StreamMultipleTupleExtractor<Exchange, Integer, String>() {
             @Override public Map<Integer, String> extract(Exchange exchange) {
                 final Map<String, String> map = Splitter.on("\n")
                     .omitEmptyStrings()
                     .withKeyValueSeparator(",")
-                    .split(new String(exchange.getIn().getBody(String.class)));
+                    .split(exchange.getIn().getBody(String.class));
+
                 final Map<Integer, String> answer = new HashMap<>();
+
                 F.forEach(map.keySet(), new IgniteInClosure<String>() {
                     @Override public void apply(String s) {
                         answer.put(Integer.parseInt(s), map.get(s));
                     }
                 });
+
                 return answer;
             }
         };
     }
 
     /**
-     * @throws Exception
+     * Subscribe to cache put events.
      */
     private CountDownLatch subscribeToPutEvents(int expect) {
         Ignite ignite = grid();
 
         // Listen to cache PUT events and expect as many as messages as test 
data items
         final CountDownLatch latch = new CountDownLatch(expect);
-        @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> 
callback = new IgniteBiPredicate<UUID, CacheEvent>() {
+        @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> 
callback =
+            new IgniteBiPredicate<UUID, CacheEvent>() {
             @Override public boolean apply(UUID uuid, CacheEvent evt) {
                 latch.countDown();
+
                 return true;
             }
         };
 
-        remoteListener = 
ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(callback, 
null, EVT_CACHE_OBJECT_PUT);
+        remoteListener = ignite.events(ignite.cluster().forCacheNodes(null))
+            .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+
         return latch;
     }
 
     /**
-     * @throws Exception
+     * Assert a given number of cache entries have been loaded.
      */
-    private void assertCacheEntriesLoaded(int count) {
+    private void assertCacheEntriesLoaded(int cnt) {
         // get the cache and check that the entries are present
         IgniteCache<Integer, String> cache = grid().cache(null);
 
         // for each key from 0 to count from the TEST_DATA (ordered by key), 
check that the entry is present in cache
-        for (Integer key : new ArrayList<>(new 
TreeSet<>(TEST_DATA.keySet())).subList(0, count))
+        for (Integer key : new ArrayList<>(new 
TreeSet<>(TEST_DATA.keySet())).subList(0, cnt))
             assertEquals(TEST_DATA.get(key), cache.get(key));
 
         // assert that the cache exactly the specified amount of elements
-        assertEquals(count, cache.size(CachePeekMode.ALL));
+        assertEquals(cnt, cache.size(CachePeekMode.ALL));
 
         // remove the event listener
         
grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);

http://git-wip-us.apache.org/repos/asf/ignite/blob/d467e26d/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
 
b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
index 26aaab1..56870fb 100644
--- 
a/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
+++ 
b/modules/camel/src/test/java/org/apache/ignite/stream/camel/IgniteCamelStreamerTestSuite.java
@@ -29,4 +29,4 @@ import org.junit.runners.Suite;
 })
 public class IgniteCamelStreamerTestSuite {
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/d467e26d/modules/camel/src/test/resources/camel.test.properties
----------------------------------------------------------------------
diff --git a/modules/camel/src/test/resources/camel.test.properties 
b/modules/camel/src/test/resources/camel.test.properties
new file mode 100644
index 0000000..ca9969a
--- /dev/null
+++ b/modules/camel/src/test/resources/camel.test.properties
@@ -0,0 +1 @@
+test.contextPath = /ignite-properties

Reply via email to