ignite-430 IgniteSocketStreamer to stream data from TCP socket.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0de79233
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0de79233
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0de79233

Branch: refs/heads/ignite-430
Commit: 0de79233da7f19863f9f7aabb6f55c3324db18c4
Parents: ea1d621
Author: agura <ag...@gridgain.com>
Authored: Mon Mar 30 02:32:49 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Mon Mar 30 02:32:49 2015 +0300

----------------------------------------------------------------------
 .../streaming/SocketStreamerExample.java        | 101 ++++-----
 .../streaming/TextSocketStreamerExample.java    | 113 +++++------
 .../ignite/streaming/IgniteSocketStreamer.java  |  10 +-
 .../streaming/IgniteTextSocketStreamer.java     |   4 +-
 .../org/apache/ignite/streaming/Receiver.java   | 184 +++++++++++++++++
 .../apache/ignite/streaming/StreamReceiver.java | 167 ---------------
 .../streaming/IgniteSocketStreamerTest.java     |   7 +-
 .../streaming/IgniteTextSocketStreamerTest.java |   4 +-
 .../apache/ignite/streaming/ReceiverTest.java   | 203 +++++++++++++++++++
 .../ignite/streaming/StreamReceiverTest.java    | 193 ------------------
 10 files changed, 511 insertions(+), 475 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
index cf24455..292bc28 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
@@ -18,32 +18,39 @@
 package org.apache.ignite.examples.streaming;
 
 import org.apache.ignite.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streaming.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.examples.ExamplesUtils;
+import org.apache.ignite.examples.streaming.numbers.CacheConfig;
+import org.apache.ignite.examples.streaming.numbers.QueryPopularNumbers;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.streaming.IgniteSocketStreamer;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Map;
+import java.util.Random;
 
 /**
- * Demonstrates how cache can be populated with data utilizing {@link 
IgniteSocketStreamer} API.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-cache.xml'}.
+ * Stream random numbers into the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
+ *     <li>Start streaming using {@link SocketStreamerExample}.</li>
+ * </ul>
  * <p>
- * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will
- * start node with {@code examples/config/example-cache.xml} configuration.
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
  */
 public class SocketStreamerExample {
-    /** Cache name. */
-    private static final String CACHE_NAME = "partitioned";
-
-    /** Number of entries to load. */
-    private static final int ENTRY_COUNT = 500000;
+    /** Random number generator. */
+    private static final Random RAND = new Random();
 
-    /** Heap size required to run this example. */
-    public static final int MIN_MEMORY = 512 * 1024 * 1024;
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
 
     /** Streaming server host. */
     private static final String HOST = "localhost";
@@ -57,44 +64,38 @@ public class SocketStreamerExample {
      * @param args Command line arguments, none required.
      * @throws IgniteException If example execution failed.
      */
-    public static void main(String[] args) throws IgniteException {
-        ExamplesUtils.checkMinMemory(MIN_MEMORY);
+    public static void main(String[] args) throws IgniteException, 
InterruptedException {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
 
-        try (Ignite ignite = 
Ignition.start("examples/config/example-cache.xml")) {
-            System.out.println();
-            System.out.println(">>> Cache data streamer example started.");
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
 
             startServer();
 
-            // Clean up caches on all nodes before run.
-            ignite.cache(CACHE_NAME).clear();
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
 
-            System.out.println();
-            System.out.println(">>> Cache clear finished.");
+            try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
 
-            long start = System.currentTimeMillis();
-
-            try (IgniteDataStreamer<Integer, String> stmr = 
ignite.dataStreamer(CACHE_NAME)) {
-                // Configure loader.
-                stmr.perNodeBufferSize(1024);
-                stmr.perNodeParallelOperations(8);
-
-                IgniteClosure<IgniteBiTuple<Integer, String>, 
Map.Entry<Integer, String>> converter =
-                    new IgniteClosure<IgniteBiTuple<Integer, String>, 
Map.Entry<Integer, String>>() {
-                        @Override public Map.Entry<Integer, String> 
apply(IgniteBiTuple<Integer, String> input) {
+                IgniteClosure<IgniteBiTuple<Integer, Long>, Map.Entry<Integer, 
Long>> converter =
+                    new IgniteClosure<IgniteBiTuple<Integer, Long>, 
Map.Entry<Integer, Long>>() {
+                        @Override public Map.Entry<Integer, Long> 
apply(IgniteBiTuple<Integer, Long> input) {
                             return new IgniteBiTuple<>(input.getKey(), 
input.getValue());
                         }
                     };
 
-                IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, 
String> sockStmr =
+                IgniteSocketStreamer<IgniteBiTuple<Integer, Long>, Integer, 
Long> sockStmr =
                     new IgniteSocketStreamer<>(HOST, PORT, stmr, converter);
 
                 sockStmr.start();
-            }
 
-            long end = System.currentTimeMillis();
-
-            System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + 
(end - start) + "ms.");
+                while(true)
+                    Thread.sleep(1000);
+            }
         }
     }
 
@@ -112,8 +113,16 @@ public class SocketStreamerExample {
                      ObjectOutputStream oos =
                          new ObjectOutputStream(new 
BufferedOutputStream(sock.getOutputStream()))) {
 
-                    for (int i = 0; i < ENTRY_COUNT; i++)
-                        oos.writeObject(new IgniteBiTuple<>(i, 
Integer.toString(i)));
+                    while(true) {
+                        oos.writeObject(new 
IgniteBiTuple<>(RAND.nextInt(RANGE), (long) (RAND.nextInt(RANGE) + 1)));
+
+                        try {
+                            Thread.sleep(1);
+                        }
+                        catch (InterruptedException e) {
+                            // No-op.
+                        }
+                    }
                 }
                 catch (IOException e) {
                     // No-op.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
index 6731a3c..d137d17 100644
--- 
a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
@@ -18,30 +18,39 @@
 package org.apache.ignite.examples.streaming;
 
 import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.streaming.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.examples.ExamplesUtils;
+import org.apache.ignite.examples.streaming.numbers.CacheConfig;
+import org.apache.ignite.examples.streaming.numbers.QueryPopularNumbers;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.streaming.IgniteTextSocketStreamer;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Map;
+import java.util.Random;
 
 /**
- * Demonstrates how cache can be populated with data utilizing {@link 
IgniteTextSocketStreamer} API.
- * <p>
- * Remote nodes should always be started with special configuration file which
- * enables P2P class loading: {@code 'ignite.{sh|bat} 
examples/config/example-cache.xml'}.
+ * Stream random numbers into the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *     <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
+ *     <li>Start streaming using {@link TextSocketStreamerExample}.</li>
+ * </ul>
  * <p>
- * Alternatively you can run {@link ExampleNodeStartup} in another JVM which 
will
- * start node with {@code examples/config/example-cache.xml} configuration.
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
  */
 public class TextSocketStreamerExample {
-    /** Number of entries to load. */
-    private static final int ENTRY_COUNT = 500000;
+    /** Random number generator. */
+    private static final Random RAND = new Random();
 
-    /** Heap size required to run this example. */
-    public static final int MIN_MEMORY = 512 * 1024 * 1024;
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
 
     /** Streaming server host. */
     private static final String HOST = "localhost";
@@ -55,56 +64,36 @@ public class TextSocketStreamerExample {
      * @param args Command line arguments, none required.
      * @throws IgniteException If example execution failed.
      */
-    public static void main(String[] args) throws IgniteException {
-        ExamplesUtils.checkMinMemory(MIN_MEMORY);
-
-        try (Ignite ignite = 
Ignition.start("examples/config/example-cache.xml")) {
-            System.out.println();
-            System.out.println(">>> Cache data streamer example started.");
+    public static void main(String[] args) throws IgniteException, 
InterruptedException {
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
 
             startServer();
 
-            // Clean up caches on all nodes before run.
-            ignite.cache(null).clear();
-
-            System.out.println();
-            System.out.println(">>> Cache clear finished.");
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
 
-            long start = System.currentTimeMillis();
+            try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
 
-            try (IgniteDataStreamer<Integer, String> stmr = 
ignite.dataStreamer(null)) {
-                // Configure loader.
-                stmr.perNodeBufferSize(1024);
-                stmr.perNodeParallelOperations(8);
-
-                IgniteClosure<String, Map.Entry<Integer, String>> converter =
-                    new IgniteClosure<String, Map.Entry<Integer, String>>() {
-                        @Override public Map.Entry<Integer, String> 
apply(String input) {
+                IgniteClosure<String, Map.Entry<Integer, Long>> converter =
+                    new IgniteClosure<String, Map.Entry<Integer, Long>>() {
+                        @Override public Map.Entry<Integer, Long> apply(String 
input) {
                             String[] pair = input.split("=");
-                            return new 
IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]);
+                            return new 
IgniteBiTuple<>(Integer.parseInt(pair[0]), Long.parseLong(pair[1]));
                         }
                 };
 
-                IgniteTextSocketStreamer<Integer, String> sockStmr =
+                IgniteTextSocketStreamer<Integer, Long> sockStmr =
                     new IgniteTextSocketStreamer<>(HOST, PORT, stmr, 
converter);
 
                 sockStmr.start();
 
-                //TODO: wait ???
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-
-                sockStmr.stop();
+                while(true)
+                    Thread.sleep(1000);
             }
-
-            long end = System.currentTimeMillis();
-
-            System.out.println(">>> Cache Size " + 
ignite.cache(null).size(CachePeekMode.PRIMARY));
-
-            System.out.println(">>> Loaded " + ENTRY_COUNT + " keys in " + 
(end - start) + "ms.");
         }
     }
 
@@ -122,12 +111,22 @@ public class TextSocketStreamerExample {
                      BufferedWriter writer =
                          new BufferedWriter(new 
OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) {
 
-                    for (int i = 0; i < ENTRY_COUNT; i++) {
-                        String num = Integer.toString(i);
 
-                        writer.write(num + '=' + num);
+                    while(true) {
+                        int key = RAND.nextInt(RANGE);
+
+                        int value = RAND.nextInt(RANGE) + 1;
+
+                        writer.write(Integer.toString(key) + '=' + 
Integer.toString(value));
 
                         writer.newLine();
+
+                        try {
+                            Thread.sleep(1);
+                        }
+                        catch (InterruptedException e) {
+                            // No-op.
+                        }
                     }
                 }
                 catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
index dc53e0f..72b6082 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
@@ -33,7 +33,7 @@ import java.util.*;
  * @param <K> Cache entry key type.
  * @param <V> Cache entry value type.
  */
-public class IgniteSocketStreamer<E, K, V> extends StreamReceiver<E, K, V> {
+public class IgniteSocketStreamer<E, K, V> extends Receiver<E, K, V> {
     /** Host. */
     private final String host;
 
@@ -63,9 +63,9 @@ public class IgniteSocketStreamer<E, K, V> extends 
StreamReceiver<E, K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override protected void loadData() {
+    @Override protected void receive() {
         try (Socket sock = new Socket(host, port)) {
-            loadData(sock);
+            receive(sock);
         }
         catch (Exception e) {
             throw new IgniteException(e);
@@ -73,12 +73,12 @@ public class IgniteSocketStreamer<E, K, V> extends 
StreamReceiver<E, K, V> {
     }
 
     /**
-     * Reads data from socket and loads them into target data stream.
+     * Reads data from socket and adds them into target data stream.
      *
      * @param sock Socket.
      */
     @SuppressWarnings("unchecked")
-    private void loadData(Socket sock) throws IOException {
+    private void receive(Socket sock) throws IOException {
         try (ObjectInputStream ois = new ObjectInputStream(new 
BufferedInputStream(sock.getInputStream()))) {
             while (!isStopped()) {
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
index a43d26b..8094d37 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
@@ -32,7 +32,7 @@ import java.util.*;
  * @param <K> Cache entry key type.
  * @param <V> Cache entry value type.
  */
-public class IgniteTextSocketStreamer<K, V> extends StreamReceiver<String, K, 
V> {
+public class IgniteTextSocketStreamer<K, V> extends Receiver<String, K, V> {
     /** Host. */
     private final String host;
 
@@ -62,7 +62,7 @@ public class IgniteTextSocketStreamer<K, V> extends 
StreamReceiver<String, K, V>
     }
 
     /** {@inheritDoc} */
-    @Override protected void loadData() {
+    @Override protected void receive() {
         try (Socket sock = new Socket(host, port)) {
             loadData(sock);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java 
b/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java
new file mode 100644
index 0000000..71a59bf
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.streaming;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Base implementation of data receiver.
+ *
+ * @param <E> Type of stream element.
+ * @param <K> Type of cache entry key.
+ * @param <V> Type of cache entry value.
+ */
+public abstract class Receiver<E, K, V> {
+    /** Object monitor. */
+    private final Object lock = new Object();
+
+    /** Stop latch. */
+    private final CountDownLatch stopLatch = new CountDownLatch(1);
+
+    /** State. */
+    private volatile State state = State.INITIALIZED;
+
+    /** Target streamer. */
+    private final IgniteDataStreamer<K, V> streamer;
+
+    /** Element to entries transformer. */
+    private final IgniteClosure<E, Map.Entry<K, V>> converter;
+
+    /** Restart interval in milliseconds. */
+    private volatile long restartInterval = 2000;
+
+    /**
+     * Constructs stream receiver.
+     *
+     * @param streamer Streamer.
+     * @param converter Element to entries transformer.
+     */
+    public Receiver(IgniteDataStreamer<K, V> streamer, IgniteClosure<E, 
Map.Entry<K, V>> converter) {
+        A.notNull(streamer, "streamer is null");
+        A.notNull(converter, "converter is null");
+
+        this.streamer = streamer;
+        this.converter = converter;
+    }
+
+    /**
+     * Sets restart interval in milliseconds.
+     *
+     * @param interval Interval in milliseconds.
+     */
+    public void restartInterval(long interval) {
+        A.ensure(interval > 0, "interval > 0");
+
+        this.restartInterval = interval;
+    }
+
+    /**
+     * Starts receiver.
+     */
+    public void start() {
+        synchronized (lock) {
+            if (state != State.INITIALIZED)
+                throw new IllegalStateException("Receiver in " + state + " 
state can't be started.");
+
+            new Thread(new ReceiverWorker()).start();
+
+            state = State.STARTED;
+        }
+    }
+
+    /**
+     * Stops receiver.
+     */
+    public void stop() {
+        synchronized (lock) {
+            if (state != State.STARTED)
+                throw new IllegalStateException("Receiver in " + state + " 
state can't be stopped.");
+
+            state = State.STOPPED;
+
+            try {
+                stopLatch.await();
+            }
+            catch (InterruptedException e) {
+                // No-op.
+            }
+        }
+    }
+
+    /**
+     * Checks whether receiver is started or not.
+     *
+     * @return {@code True} if receiver is started, {@code false} - otherwise.
+     */
+    public boolean isStarted() {
+        return state == State.STARTED;
+    }
+
+    /**
+     * Checks whether receiver is stopped or not.
+     *
+     * @return {@code True} if receiver is stopped, {@code false} - otherwise.
+     */
+    public boolean isStopped() {
+        return state == State.STOPPED;
+    }
+
+    /**
+     * Performs actual data receiving.
+     */
+    protected abstract void receive();
+
+    /**
+     * Convert stream data to cache entry and transfer it to the target 
streamer.
+     *
+     * @param element Element.
+     */
+    protected void addData(E element) {
+        streamer.addData(converter.apply(element));
+    }
+
+    /**
+     * Receiver state.
+     */
+    public enum State {
+        /** New. */
+        INITIALIZED,
+        /** Started. */
+        STARTED,
+        /** Stopped. */
+        STOPPED
+    }
+
+    /**
+     * Receiver worker that actually receives data from socket.
+     */
+    private class ReceiverWorker implements Runnable {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            while (true) {
+                try {
+                    receive();
+                }
+                catch (Throwable e) {
+                    // No-op.
+                }
+
+                if (isStopped()) {
+                    stopLatch.countDown();
+
+                    break;
+                }
+
+                try {
+                    Thread.sleep(restartInterval);
+                }
+                catch (InterruptedException e) {
+                    // No-op.
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java 
b/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
deleted file mode 100644
index 50719a4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streaming/StreamReceiver.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Base implementation of stream receiver.
- *
- * @param <E> Type of stream element.
- * @param <K> Type of cache entry key.
- * @param <V> Type of cache entry value/
- */
-public abstract class StreamReceiver<E, K, V> {
-    /** Object monitor. */
-    private final Object lock = new Object();
-
-    /** Stop latch. */
-    private final CountDownLatch stopLatch = new CountDownLatch(1);
-
-    /** State. */
-    private volatile State state = State.INITIALIZED;
-
-    /** Target streamer. */
-    private final IgniteDataStreamer<K, V> streamer;
-
-    /** Element to entries transformer. */
-    private final IgniteClosure<E, Map.Entry<K, V>> converter;
-
-    /**
-     * Constructs stream receiver.
-     *
-     * @param streamer Streamer.
-     * @param converter Element to entries transformer.
-     */
-    public StreamReceiver(IgniteDataStreamer<K, V> streamer, IgniteClosure<E, 
Map.Entry<K, V>> converter) {
-        A.notNull(streamer, "streamer is null");
-        A.notNull(converter, "converter is null");
-
-        this.streamer = streamer;
-        this.converter = converter;
-    }
-
-    /**
-     * Starts streamer.
-     */
-    public void start() {
-        synchronized (lock) {
-            if (state != State.INITIALIZED)
-                throw new IllegalStateException("Receiver in " + state + " 
state can't be started.");
-
-            new Thread(new Receiver()).start();
-
-            state = State.STARTED;
-        }
-    }
-
-    /**
-     * Stops streamer.
-     */
-    public void stop() {
-        synchronized (lock) {
-            if (state != State.STARTED)
-                throw new IllegalStateException("Receiver in " + state + " 
state can't be stopped.");
-
-            state = State.STOPPED;
-
-            try {
-                stopLatch.await();
-            }
-            catch (InterruptedException e) {
-                // No-op.
-            }
-        }
-    }
-
-    /**
-     * Returns stream receiver state.
-     *
-     * @return stream receiver state.
-     */
-    public State state() {
-        return state;
-    }
-
-    /**
-     * Checks whether receiver is started or not.
-     *
-     * @return {@code True} if receiver is started, {@code false} - otherwise.
-     */
-    public boolean isStarted() {
-        return state == State.STARTED;
-    }
-
-    /**
-     * Checks whether receiver is stopped or not.
-     *
-     * @return {@code True} if receiver is stopped, {@code false} - otherwise.
-     */
-    public boolean isStopped() {
-        return state == State.STOPPED;
-    }
-
-    /**
-     * Performs actual loading of data. Override this method in order to 
implement own data loading functionality.
-     */
-    protected abstract void loadData();
-
-    /**
-     * Convert stream data to cache entry and transfer it to the target 
streamer.
-     *
-     * @param element Element.
-     */
-    protected void addData(E element) {
-        streamer.addData(converter.apply(element));
-    }
-
-    /**
-     * Receiver state.
-     */
-    public enum State {
-        /** New. */
-        INITIALIZED,
-        /** Started. */
-        STARTED,
-        /** Stopped. */
-        STOPPED
-    }
-
-    /**
-     * Receiver worker that actually receives data from socket.
-     */
-    private class Receiver implements Runnable {
-        /** {@inheritDoc} */
-        @Override public void run() {
-            try {
-                loadData();
-            }
-            catch (Throwable e) {
-                //TODO: restart
-            }
-            finally {
-                stopLatch.countDown();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
index 9164352..d7357c7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
@@ -40,7 +40,7 @@ public class IgniteSocketStreamerTest extends 
GridCommonAbstractTest {
     private static final int PORT = 5555;
 
     /** Entry count. */
-    private static final int ENTRY_CNT = 50000;
+    private static final int ENTRY_CNT = 5000;
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
@@ -96,7 +96,7 @@ public class IgniteSocketStreamerTest extends 
GridCommonAbstractTest {
 
                 // Wait for all data streamed.
                 while (cnt.get() < ENTRY_CNT)
-                    Thread.sleep(200);
+                    Thread.sleep(100);
 
                 sockStmr.stop();
 
@@ -105,7 +105,8 @@ public class IgniteSocketStreamerTest extends 
GridCommonAbstractTest {
             }
 
             assertEquals(ENTRY_CNT, cache.size());
-        } finally {
+        }
+        finally {
             stopAllGrids();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
index 635b983..436bc8f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
@@ -23,7 +23,7 @@ public class IgniteTextSocketStreamerTest extends 
GridCommonAbstractTest {
     private static final int PORT = 5555;
 
     /** Entry count. */
-    private static final int ENTRY_CNT = 50000;
+    private static final int ENTRY_CNT = 5000;
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
@@ -79,7 +79,7 @@ public class IgniteTextSocketStreamerTest extends 
GridCommonAbstractTest {
 
                 // Wait for all data streamed.
                 while (cnt.get() < ENTRY_CNT)
-                    Thread.sleep(200);
+                    Thread.sleep(100);
 
                 sockStmr.stop();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java 
b/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java
new file mode 100644
index 0000000..9f4e056
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java
@@ -0,0 +1,203 @@
+package org.apache.ignite.streaming;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+
+import junit.framework.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Tests for {@link Receiver}.
+ */
+public class ReceiverTest extends TestCase {
+    /** Converter. */
+    private static final IgniteClosure<Integer, Map.Entry<Integer, String>> 
CONVERTER =
+        new IgniteClosure<Integer, Map.Entry<Integer, String>>() {
+            @Override public Map.Entry<Integer, String> apply(Integer input) {
+                return new IgniteBiTuple<>(input, input.toString());
+            }
+    };
+
+    /** Stmr. */
+    private static final IgniteDataStreamer<Integer, String> STMR = new 
DataStreamerStub<>();
+
+    /** Receiver. */
+    private final Receiver<Integer, Integer, String> receiver =
+        new Receiver<Integer, Integer, String>(STMR, CONVERTER) {
+            @Override protected void receive() {
+                while (!isStopped()) {
+                    try {
+                        Thread.sleep(50);
+                    }
+                    catch (InterruptedException e) {
+                        // No-op.
+                    }
+                }
+            }
+        };
+
+    /**
+     * Tests receiver behavior in case of forced termination.
+     *
+     * @throws Exception If error occurred.
+     */
+    public void testReceiver() throws Exception {
+        assertFalse(receiver.isStarted());
+        assertFalse(receiver.isStopped());
+
+        receiver.start();
+
+        assertTrue(receiver.isStarted());
+        assertFalse(receiver.isStopped());
+
+        // Wait for some period before stop.
+        Thread.sleep(500);
+
+        receiver.stop();
+
+        assertFalse(receiver.isStarted());
+        assertTrue(receiver.isStopped());
+
+        try {
+            receiver.start();
+            fail("IllegalStateException expected.");
+        }
+        catch (IllegalStateException e) {
+            // No-op
+        }
+
+        try {
+            receiver.stop();
+            fail("IllegalStateException expected.");
+        }
+        catch (IllegalStateException e) {
+            // No-op
+        }
+    }
+
+    /**
+     * Receiver stub.
+     *
+     * @param <K> Key type.
+     * @param <V> Value type.
+     */
+    private static class DataStreamerStub<K, V> implements 
IgniteDataStreamer<K, V> {
+        /** {@inheritDoc} */
+        @Override public String cacheName() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean allowOverwrite() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void allowOverwrite(boolean allowOverwrite) throws 
IgniteException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean skipStore() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void skipStore(boolean skipStore) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public int perNodeBufferSize() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void perNodeBufferSize(int bufSize) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public int perNodeParallelOperations() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void perNodeParallelOperations(int parallelOps) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public long autoFlushFrequency() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void autoFlushFrequency(long autoFlushFreq) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> future() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void deployClass(Class<?> depCls) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void 
receiver(org.apache.ignite.stream.StreamReceiver<K, V> rcvr) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> removeData(K key) throws 
IgniteException, IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(K key, @Nullable V val) 
throws IgniteException, IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws 
IgniteException, IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(Collection<? extends 
Map.Entry<K, V>> entries)
+                throws IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(Map<K, V> entries) throws 
IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void flush() throws IgniteException, 
IllegalStateException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void tryFlush() throws IgniteException, 
IllegalStateException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close(boolean cancel) throws IgniteException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteException {
+            // No-op.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0de79233/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
 
b/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
deleted file mode 100644
index 4860b97..0000000
--- 
a/modules/core/src/test/java/org/apache/ignite/streaming/StreamReceiverTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-package org.apache.ignite.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-
-import junit.framework.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Tests for {@link StreamReceiver}.
- */
-public class StreamReceiverTest extends TestCase {
-    /** Converter. */
-    private static final IgniteClosure<Integer, Map.Entry<Integer, String>> 
CONVERTER =
-        new IgniteClosure<Integer, Map.Entry<Integer, String>>() {
-            @Override public Map.Entry<Integer, String> apply(Integer input) {
-                return new IgniteBiTuple<>(input, input.toString());
-            }
-    };
-
-    /** Stmr. */
-    private static final IgniteDataStreamer<Integer, String> STMR = new 
DataStreamerStub<>();
-
-    /** Receiver. */
-    private final StreamReceiver<Integer, Integer, String> receiver =
-        new StreamReceiver<Integer, Integer, String>(STMR, CONVERTER) {
-            @Override protected void loadData() {
-                while (!isStopped()) {
-                    try {
-                        Thread.sleep(50);
-                    }
-                    catch (InterruptedException e) {
-                        // No-op.
-                    }
-                }
-            }
-        };
-
-    /**
-     * Tests receiver behavior in case of forced termination.
-     *
-     * @throws Exception If error occurred.
-     */
-    public void testReceiver() throws Exception {
-        assertEquals(StreamReceiver.State.INITIALIZED, receiver.state());
-        assertFalse(receiver.isStarted());
-        assertFalse(receiver.isStopped());
-
-        receiver.start();
-
-        assertEquals(StreamReceiver.State.STARTED, receiver.state());
-
-        assertTrue(receiver.isStarted());
-        assertFalse(receiver.isStopped());
-
-        // Wait for some period before stop.
-        Thread.sleep(500);
-
-        receiver.stop();
-
-        assertEquals(StreamReceiver.State.STOPPED, receiver.state());
-
-        assertFalse(receiver.isStarted());
-        assertTrue(receiver.isStopped());
-    }
-
-    /**
-     * Receiver stub.
-     *
-     * @param <K> Key type.
-     * @param <V> Value type.
-     */
-    private static class DataStreamerStub<K, V> implements 
IgniteDataStreamer<K, V> {
-
-        /** {@inheritDoc} */
-        @Override public String cacheName() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean allowOverwrite() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void allowOverwrite(boolean allowOverwrite) throws 
IgniteException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean skipStore() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void skipStore(boolean skipStore) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public int perNodeBufferSize() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void perNodeBufferSize(int bufSize) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public int perNodeParallelOperations() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void perNodeParallelOperations(int parallelOps) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public long autoFlushFrequency() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void autoFlushFrequency(long autoFlushFreq) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> future() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void deployClass(Class<?> depCls) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void 
receiver(org.apache.ignite.stream.StreamReceiver<K, V> rcvr) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> removeData(K key) throws 
IgniteException, IllegalStateException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> addData(K key, @Nullable V val) 
throws IgniteException, IllegalStateException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws 
IgniteException, IllegalStateException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> addData(Collection<? extends 
Map.Entry<K, V>> entries)
-                throws IllegalStateException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> addData(Map<K, V> entries) throws 
IllegalStateException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void flush() throws IgniteException, 
IllegalStateException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void tryFlush() throws IgniteException, 
IllegalStateException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close(boolean cancel) throws IgniteException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteException {
-            // No-op.
-        }
-    }
-}
\ No newline at end of file

Reply via email to