ignite-430 IgniteSocketStreamer to stream data from TCP socket.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0de79233 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0de79233 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0de79233 Branch: refs/heads/ignite-430 Commit: 0de79233da7f19863f9f7aabb6f55c3324db18c4 Parents: ea1d621 Author: agura <ag...@gridgain.com> Authored: Mon Mar 30 02:32:49 2015 +0300 Committer: agura <ag...@gridgain.com> Committed: Mon Mar 30 02:32:49 2015 +0300 ---------------------------------------------------------------------- .../streaming/SocketStreamerExample.java | 101 ++++----- .../streaming/TextSocketStreamerExample.java | 113 +++++------ .../ignite/streaming/IgniteSocketStreamer.java | 10 +- .../streaming/IgniteTextSocketStreamer.java | 4 +- .../org/apache/ignite/streaming/Receiver.java | 184 +++++++++++++++++ .../apache/ignite/streaming/StreamReceiver.java | 167 --------------- .../streaming/IgniteSocketStreamerTest.java | 7 +- .../streaming/IgniteTextSocketStreamerTest.java | 4 +- .../apache/ignite/streaming/ReceiverTest.java | 203 +++++++++++++++++++ .../ignite/streaming/StreamReceiverTest.java | 193 ------------------ 10 files changed, 511 insertions(+), 475 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java index cf24455..292bc28 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java @@ -18,32 +18,39 @@ package org.apache.ignite.examples.streaming; import org.apache.ignite.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streaming.*; - -import java.io.*; -import java.net.*; -import java.util.*; +import org.apache.ignite.examples.ExampleNodeStartup; +import org.apache.ignite.examples.ExamplesUtils; +import org.apache.ignite.examples.streaming.numbers.CacheConfig; +import org.apache.ignite.examples.streaming.numbers.QueryPopularNumbers; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.streaming.IgniteSocketStreamer; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Map; +import java.util.Random; /** - * Demonstrates how cache can be populated with data utilizing {@link IgniteSocketStreamer} API. - * <p> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> + * <li>Start streaming using {@link SocketStreamerExample}.</li> + * </ul> * <p> - * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will - * start node with {@code examples/config/example-cache.xml} configuration. + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. */ public class SocketStreamerExample { - /** Cache name. */ - private static final String CACHE_NAME = "partitioned"; - - /** Number of entries to load. */ - private static final int ENTRY_COUNT = 500000; + /** Random number generator. */ + private static final Random RAND = new Random(); - /** Heap size required to run this example. */ - public static final int MIN_MEMORY = 512 * 1024 * 1024; + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; /** Streaming server host. */ private static final String HOST = "localhost"; @@ -57,44 +64,38 @@ public class SocketStreamerExample { * @param args Command line arguments, none required. * @throws IgniteException If example execution failed. */ - public static void main(String[] args) throws IgniteException { - ExamplesUtils.checkMinMemory(MIN_MEMORY); + public static void main(String[] args) throws IgniteException, InterruptedException { + // Mark this cluster member as client. + Ignition.setClientMode(true); - try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) { - System.out.println(); - System.out.println(">>> Cache data streamer example started."); + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; startServer(); - // Clean up caches on all nodes before run. - ignite.cache(CACHE_NAME).clear(); + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); - System.out.println(); - System.out.println(">>> Cache clear finished."); + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); - long start = System.currentTimeMillis(); - - try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(CACHE_NAME)) { - // Configure loader. - stmr.perNodeBufferSize(1024); - stmr.perNodeParallelOperations(8); - - IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>> converter = - new IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>>() { - @Override public Map.Entry<Integer, String> apply(IgniteBiTuple<Integer, String> input) { + IgniteClosure<IgniteBiTuple<Integer, Long>, Map.Entry<Integer, Long>> converter = + new IgniteClosure<IgniteBiTuple<Integer, Long>, Map.Entry<Integer, Long>>() { + @Override public Map.Entry<Integer, Long> apply(IgniteBiTuple<Integer, Long> input) { return new IgniteBiTuple<>(input.getKey(), input.getValue()); } }; - IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr = + IgniteSocketStreamer<IgniteBiTuple<Integer, Long>, Integer, Long> sockStmr = new IgniteSocketStreamer<>(HOST, PORT, stmr, converter); sockStmr.start(); - } - long end = System.currentTimeMillis(); - - System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + (end - start) + "ms."); + while(true) + Thread.sleep(1000); + } } } @@ -112,8 +113,16 @@ public class SocketStreamerExample { ObjectOutputStream oos = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()))) { - for (int i = 0; i < ENTRY_COUNT; i++) - oos.writeObject(new IgniteBiTuple<>(i, Integer.toString(i))); + while(true) { + oos.writeObject(new IgniteBiTuple<>(RAND.nextInt(RANGE), (long) (RAND.nextInt(RANGE) + 1))); + + try { + Thread.sleep(1); + } + catch (InterruptedException e) { + // No-op. + } + } } catch (IOException e) { // No-op. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java index 6731a3c..d137d17 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java @@ -18,30 +18,39 @@ package org.apache.ignite.examples.streaming; import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.streaming.*; - -import java.io.*; -import java.net.*; -import java.util.*; +import org.apache.ignite.examples.ExampleNodeStartup; +import org.apache.ignite.examples.ExamplesUtils; +import org.apache.ignite.examples.streaming.numbers.CacheConfig; +import org.apache.ignite.examples.streaming.numbers.QueryPopularNumbers; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.streaming.IgniteTextSocketStreamer; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Map; +import java.util.Random; /** - * Demonstrates how cache can be populated with data utilizing {@link IgniteTextSocketStreamer} API. - * <p> - * Remote nodes should always be started with special configuration file which - * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-cache.xml'}. + * Stream random numbers into the streaming cache. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> + * <li>Start streaming using {@link TextSocketStreamerExample}.</li> + * </ul> * <p> - * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will - * start node with {@code examples/config/example-cache.xml} configuration. + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. */ public class TextSocketStreamerExample { - /** Number of entries to load. */ - private static final int ENTRY_COUNT = 500000; + /** Random number generator. */ + private static final Random RAND = new Random(); - /** Heap size required to run this example. */ - public static final int MIN_MEMORY = 512 * 1024 * 1024; + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; /** Streaming server host. */ private static final String HOST = "localhost"; @@ -55,56 +64,36 @@ public class TextSocketStreamerExample { * @param args Command line arguments, none required. * @throws IgniteException If example execution failed. */ - public static void main(String[] args) throws IgniteException { - ExamplesUtils.checkMinMemory(MIN_MEMORY); - - try (Ignite ignite = Ignition.start("examples/config/example-cache.xml")) { - System.out.println(); - System.out.println(">>> Cache data streamer example started."); + public static void main(String[] args) throws IgniteException, InterruptedException { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; startServer(); - // Clean up caches on all nodes before run. - ignite.cache(null).clear(); - - System.out.println(); - System.out.println(">>> Cache clear finished."); + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); - long start = System.currentTimeMillis(); + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); - try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) { - // Configure loader. - stmr.perNodeBufferSize(1024); - stmr.perNodeParallelOperations(8); - - IgniteClosure<String, Map.Entry<Integer, String>> converter = - new IgniteClosure<String, Map.Entry<Integer, String>>() { - @Override public Map.Entry<Integer, String> apply(String input) { + IgniteClosure<String, Map.Entry<Integer, Long>> converter = + new IgniteClosure<String, Map.Entry<Integer, Long>>() { + @Override public Map.Entry<Integer, Long> apply(String input) { String[] pair = input.split("="); - return new IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]); + return new IgniteBiTuple<>(Integer.parseInt(pair[0]), Long.parseLong(pair[1])); } }; - IgniteTextSocketStreamer<Integer, String> sockStmr = + IgniteTextSocketStreamer<Integer, Long> sockStmr = new IgniteTextSocketStreamer<>(HOST, PORT, stmr, converter); sockStmr.start(); - //TODO: wait ??? - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - sockStmr.stop(); + while(true) + Thread.sleep(1000); } - - long end = System.currentTimeMillis(); - - System.out.println(">>> Cache Size " + ignite.cache(null).size(CachePeekMode.PRIMARY)); - - System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + (end - start) + "ms."); } } @@ -122,12 +111,22 @@ public class TextSocketStreamerExample { BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) { - for (int i = 0; i < ENTRY_COUNT; i++) { - String num = Integer.toString(i); - writer.write(num + '=' + num); + while(true) { + int key = RAND.nextInt(RANGE); + + int value = RAND.nextInt(RANGE) + 1; + + writer.write(Integer.toString(key) + '=' + Integer.toString(value)); writer.newLine(); + + try { + Thread.sleep(1); + } + catch (InterruptedException e) { + // No-op. + } } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java index dc53e0f..72b6082 100644 --- a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java @@ -33,7 +33,7 @@ import java.util.*; * @param <K> Cache entry key type. * @param <V> Cache entry value type. */ -public class IgniteSocketStreamer<E, K, V> extends StreamReceiver<E, K, V> { +public class IgniteSocketStreamer<E, K, V> extends Receiver<E, K, V> { /** Host. */ private final String host; @@ -63,9 +63,9 @@ public class IgniteSocketStreamer<E, K, V> extends StreamReceiver<E, K, V> { } /** {@inheritDoc} */ - @Override protected void loadData() { + @Override protected void receive() { try (Socket sock = new Socket(host, port)) { - loadData(sock); + receive(sock); } catch (Exception e) { throw new IgniteException(e); @@ -73,12 +73,12 @@ public class IgniteSocketStreamer<E, K, V> extends StreamReceiver<E, K, V> { } /** - * Reads data from socket and loads them into target data stream. + * Reads data from socket and adds them into target data stream. * * @param sock Socket. */ @SuppressWarnings("unchecked") - private void loadData(Socket sock) throws IOException { + private void receive(Socket sock) throws IOException { try (ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()))) { while (!isStopped()) { try { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java index a43d26b..8094d37 100644 --- a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java @@ -32,7 +32,7 @@ import java.util.*; * @param <K> Cache entry key type. * @param <V> Cache entry value type. */ -public class IgniteTextSocketStreamer<K, V> extends StreamReceiver<String, K, V> { +public class IgniteTextSocketStreamer<K, V> extends Receiver<String, K, V> { /** Host. */ private final String host; @@ -62,7 +62,7 @@ public class IgniteTextSocketStreamer<K, V> extends StreamReceiver<String, K, V> } /** {@inheritDoc} */ - @Override protected void loadData() { + @Override protected void receive() { try (Socket sock = new Socket(host, port)) { loadData(sock); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java b/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java new file mode 100644 index 0000000..71a59bf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * Base implementation of data receiver. + * + * @param <E> Type of stream element. + * @param <K> Type of cache entry key. + * @param <V> Type of cache entry value. + */ +public abstract class Receiver<E, K, V> { + /** Object monitor. */ + private final Object lock = new Object(); + + /** Stop latch. */ + private final CountDownLatch stopLatch = new CountDownLatch(1); + + /** State. */ + private volatile State state = State.INITIALIZED; + + /** Target streamer. */ + private final IgniteDataStreamer<K, V> streamer; + + /** Element to entries transformer. */ + private final IgniteClosure<E, Map.Entry<K, V>> converter; + + /** Restart interval in milliseconds. */ + private volatile long restartInterval = 2000; + + /** + * Constructs stream receiver. + * + * @param streamer Streamer. + * @param converter Element to entries transformer. + */ + public Receiver(IgniteDataStreamer<K, V> streamer, IgniteClosure<E, Map.Entry<K, V>> converter) { + A.notNull(streamer, "streamer is null"); + A.notNull(converter, "converter is null"); + + this.streamer = streamer; + this.converter = converter; + } + + /** + * Sets restart interval in milliseconds. + * + * @param interval Interval in milliseconds. + */ + public void restartInterval(long interval) { + A.ensure(interval > 0, "interval > 0"); + + this.restartInterval = interval; + } + + /** + * Starts receiver. + */ + public void start() { + synchronized (lock) { + if (state != State.INITIALIZED) + throw new IllegalStateException("Receiver in " + state + " state can't be started."); + + new Thread(new ReceiverWorker()).start(); + + state = State.STARTED; + } + } + + /** + * Stops receiver. + */ + public void stop() { + synchronized (lock) { + if (state != State.STARTED) + throw new IllegalStateException("Receiver in " + state + " state can't be stopped."); + + state = State.STOPPED; + + try { + stopLatch.await(); + } + catch (InterruptedException e) { + // No-op. + } + } + } + + /** + * Checks whether receiver is started or not. + * + * @return {@code True} if receiver is started, {@code false} - otherwise. + */ + public boolean isStarted() { + return state == State.STARTED; + } + + /** + * Checks whether receiver is stopped or not. + * + * @return {@code True} if receiver is stopped, {@code false} - otherwise. + */ + public boolean isStopped() { + return state == State.STOPPED; + } + + /** + * Performs actual data receiving. + */ + protected abstract void receive(); + + /** + * Convert stream data to cache entry and transfer it to the target streamer. + * + * @param element Element. + */ + protected void addData(E element) { + streamer.addData(converter.apply(element)); + } + + /** + * Receiver state. + */ + public enum State { + /** New. */ + INITIALIZED, + /** Started. */ + STARTED, + /** Stopped. */ + STOPPED + } + + /** + * Receiver worker that actually receives data from socket. + */ + private class ReceiverWorker implements Runnable { + /** {@inheritDoc} */ + @Override public void run() { + while (true) { + try { + receive(); + } + catch (Throwable e) { + // No-op. + } + + if (isStopped()) { + stopLatch.countDown(); + + break; + } + + try { + Thread.sleep(restartInterval); + } + catch (InterruptedException e) { + // No-op. + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java deleted file mode 100644 index 50719a4..0000000 --- a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; - -import java.util.*; -import java.util.concurrent.*; - -/** - * Base implementation of stream receiver. - * - * @param <E> Type of stream element. - * @param <K> Type of cache entry key. - * @param <V> Type of cache entry value/ - */ -public abstract class StreamReceiver<E, K, V> { - /** Object monitor. */ - private final Object lock = new Object(); - - /** Stop latch. */ - private final CountDownLatch stopLatch = new CountDownLatch(1); - - /** State. */ - private volatile State state = State.INITIALIZED; - - /** Target streamer. */ - private final IgniteDataStreamer<K, V> streamer; - - /** Element to entries transformer. */ - private final IgniteClosure<E, Map.Entry<K, V>> converter; - - /** - * Constructs stream receiver. - * - * @param streamer Streamer. - * @param converter Element to entries transformer. - */ - public StreamReceiver(IgniteDataStreamer<K, V> streamer, IgniteClosure<E, Map.Entry<K, V>> converter) { - A.notNull(streamer, "streamer is null"); - A.notNull(converter, "converter is null"); - - this.streamer = streamer; - this.converter = converter; - } - - /** - * Starts streamer. - */ - public void start() { - synchronized (lock) { - if (state != State.INITIALIZED) - throw new IllegalStateException("Receiver in " + state + " state can't be started."); - - new Thread(new Receiver()).start(); - - state = State.STARTED; - } - } - - /** - * Stops streamer. - */ - public void stop() { - synchronized (lock) { - if (state != State.STARTED) - throw new IllegalStateException("Receiver in " + state + " state can't be stopped."); - - state = State.STOPPED; - - try { - stopLatch.await(); - } - catch (InterruptedException e) { - // No-op. - } - } - } - - /** - * Returns stream receiver state. - * - * @return stream receiver state. - */ - public State state() { - return state; - } - - /** - * Checks whether receiver is started or not. - * - * @return {@code True} if receiver is started, {@code false} - otherwise. - */ - public boolean isStarted() { - return state == State.STARTED; - } - - /** - * Checks whether receiver is stopped or not. - * - * @return {@code True} if receiver is stopped, {@code false} - otherwise. - */ - public boolean isStopped() { - return state == State.STOPPED; - } - - /** - * Performs actual loading of data. Override this method in order to implement own data loading functionality. - */ - protected abstract void loadData(); - - /** - * Convert stream data to cache entry and transfer it to the target streamer. - * - * @param element Element. - */ - protected void addData(E element) { - streamer.addData(converter.apply(element)); - } - - /** - * Receiver state. - */ - public enum State { - /** New. */ - INITIALIZED, - /** Started. */ - STARTED, - /** Stopped. */ - STOPPED - } - - /** - * Receiver worker that actually receives data from socket. - */ - private class Receiver implements Runnable { - /** {@inheritDoc} */ - @Override public void run() { - try { - loadData(); - } - catch (Throwable e) { - //TODO: restart - } - finally { - stopLatch.countDown(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java index 9164352..d7357c7 100644 --- a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java @@ -40,7 +40,7 @@ public class IgniteSocketStreamerTest extends GridCommonAbstractTest { private static final int PORT = 5555; /** Entry count. */ - private static final int ENTRY_CNT = 50000; + private static final int ENTRY_CNT = 5000; /** {@inheritDoc} */ @SuppressWarnings("unchecked") @@ -96,7 +96,7 @@ public class IgniteSocketStreamerTest extends GridCommonAbstractTest { // Wait for all data streamed. while (cnt.get() < ENTRY_CNT) - Thread.sleep(200); + Thread.sleep(100); sockStmr.stop(); @@ -105,7 +105,8 @@ public class IgniteSocketStreamerTest extends GridCommonAbstractTest { } assertEquals(ENTRY_CNT, cache.size()); - } finally { + } + finally { stopAllGrids(); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java index 635b983..436bc8f 100644 --- a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java @@ -23,7 +23,7 @@ public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest { private static final int PORT = 5555; /** Entry count. */ - private static final int ENTRY_CNT = 50000; + private static final int ENTRY_CNT = 5000; /** {@inheritDoc} */ @SuppressWarnings("unchecked") @@ -79,7 +79,7 @@ public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest { // Wait for all data streamed. while (cnt.get() < ENTRY_CNT) - Thread.sleep(200); + Thread.sleep(100); sockStmr.stop(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java new file mode 100644 index 0000000..9f4e056 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java @@ -0,0 +1,203 @@ +package org.apache.ignite.streaming; + +import org.apache.ignite.*; +import org.apache.ignite.lang.*; + +import junit.framework.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Tests for {@link Receiver}. + */ +public class ReceiverTest extends TestCase { + /** Converter. */ + private static final IgniteClosure<Integer, Map.Entry<Integer, String>> CONVERTER = + new IgniteClosure<Integer, Map.Entry<Integer, String>>() { + @Override public Map.Entry<Integer, String> apply(Integer input) { + return new IgniteBiTuple<>(input, input.toString()); + } + }; + + /** Stmr. */ + private static final IgniteDataStreamer<Integer, String> STMR = new DataStreamerStub<>(); + + /** Receiver. */ + private final Receiver<Integer, Integer, String> receiver = + new Receiver<Integer, Integer, String>(STMR, CONVERTER) { + @Override protected void receive() { + while (!isStopped()) { + try { + Thread.sleep(50); + } + catch (InterruptedException e) { + // No-op. + } + } + } + }; + + /** + * Tests receiver behavior in case of forced termination. + * + * @throws Exception If error occurred. + */ + public void testReceiver() throws Exception { + assertFalse(receiver.isStarted()); + assertFalse(receiver.isStopped()); + + receiver.start(); + + assertTrue(receiver.isStarted()); + assertFalse(receiver.isStopped()); + + // Wait for some period before stop. + Thread.sleep(500); + + receiver.stop(); + + assertFalse(receiver.isStarted()); + assertTrue(receiver.isStopped()); + + try { + receiver.start(); + fail("IllegalStateException expected."); + } + catch (IllegalStateException e) { + // No-op + } + + try { + receiver.stop(); + fail("IllegalStateException expected."); + } + catch (IllegalStateException e) { + // No-op + } + } + + /** + * Receiver stub. + * + * @param <K> Key type. + * @param <V> Value type. + */ + private static class DataStreamerStub<K, V> implements IgniteDataStreamer<K, V> { + /** {@inheritDoc} */ + @Override public String cacheName() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean allowOverwrite() { + return false; + } + + /** {@inheritDoc} */ + @Override public void allowOverwrite(boolean allowOverwrite) throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean skipStore() { + return false; + } + + /** {@inheritDoc} */ + @Override public void skipStore(boolean skipStore) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int perNodeBufferSize() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void perNodeBufferSize(int bufSize) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int perNodeParallelOperations() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void perNodeParallelOperations(int parallelOps) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public long autoFlushFrequency() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void autoFlushFrequency(long autoFlushFreq) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> future() { + return null; + } + + /** {@inheritDoc} */ + @Override public void deployClass(Class<?> depCls) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void receiver(org.apache.ignite.stream.StreamReceiver<K, V> rcvr) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> removeData(K key) throws IgniteException, IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) + throws IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException { + return null; + } + + /** {@inheritDoc} */ + @Override public void flush() throws IgniteException, IllegalStateException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void tryFlush() throws IgniteException, IllegalStateException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void close(boolean cancel) throws IgniteException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteException { + // No-op. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java deleted file mode 100644 index 4860b97..0000000 --- a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java +++ /dev/null @@ -1,193 +0,0 @@ -package org.apache.ignite.streaming; - -import org.apache.ignite.*; -import org.apache.ignite.lang.*; - -import junit.framework.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Tests for {@link StreamReceiver}. - */ -public class StreamReceiverTest extends TestCase { - /** Converter. */ - private static final IgniteClosure<Integer, Map.Entry<Integer, String>> CONVERTER = - new IgniteClosure<Integer, Map.Entry<Integer, String>>() { - @Override public Map.Entry<Integer, String> apply(Integer input) { - return new IgniteBiTuple<>(input, input.toString()); - } - }; - - /** Stmr. */ - private static final IgniteDataStreamer<Integer, String> STMR = new DataStreamerStub<>(); - - /** Receiver. */ - private final StreamReceiver<Integer, Integer, String> receiver = - new StreamReceiver<Integer, Integer, String>(STMR, CONVERTER) { - @Override protected void loadData() { - while (!isStopped()) { - try { - Thread.sleep(50); - } - catch (InterruptedException e) { - // No-op. - } - } - } - }; - - /** - * Tests receiver behavior in case of forced termination. - * - * @throws Exception If error occurred. - */ - public void testReceiver() throws Exception { - assertEquals(StreamReceiver.State.INITIALIZED, receiver.state()); - assertFalse(receiver.isStarted()); - assertFalse(receiver.isStopped()); - - receiver.start(); - - assertEquals(StreamReceiver.State.STARTED, receiver.state()); - - assertTrue(receiver.isStarted()); - assertFalse(receiver.isStopped()); - - // Wait for some period before stop. - Thread.sleep(500); - - receiver.stop(); - - assertEquals(StreamReceiver.State.STOPPED, receiver.state()); - - assertFalse(receiver.isStarted()); - assertTrue(receiver.isStopped()); - } - - /** - * Receiver stub. - * - * @param <K> Key type. - * @param <V> Value type. - */ - private static class DataStreamerStub<K, V> implements IgniteDataStreamer<K, V> { - - /** {@inheritDoc} */ - @Override public String cacheName() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean allowOverwrite() { - return false; - } - - /** {@inheritDoc} */ - @Override public void allowOverwrite(boolean allowOverwrite) throws IgniteException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean skipStore() { - return false; - } - - /** {@inheritDoc} */ - @Override public void skipStore(boolean skipStore) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int perNodeBufferSize() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void perNodeBufferSize(int bufSize) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int perNodeParallelOperations() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void perNodeParallelOperations(int parallelOps) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public long autoFlushFrequency() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void autoFlushFrequency(long autoFlushFreq) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> future() { - return null; - } - - /** {@inheritDoc} */ - @Override public void deployClass(Class<?> depCls) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void receiver(org.apache.ignite.stream.StreamReceiver<K, V> rcvr) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> removeData(K key) throws IgniteException, IllegalStateException { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IllegalStateException { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IllegalStateException { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) - throws IllegalStateException { - return null; - } - - /** {@inheritDoc} */ - @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException { - return null; - } - - /** {@inheritDoc} */ - @Override public void flush() throws IgniteException, IllegalStateException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void tryFlush() throws IgniteException, IllegalStateException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void close(boolean cancel) throws IgniteException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void close() throws IgniteException { - // No-op. - } - } -} \ No newline at end of file