Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-430-1 fd401bfb9 -> 3954773b4 (forced update)


ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3954773b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3954773b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3954773b

Branch: refs/heads/ignite-430-1
Commit: 3954773b4507855c2dd4bc0567ad95eefa30dba1
Parents: 742bd4a
Author: agura <[email protected]>
Authored: Mon Apr 13 18:28:40 2015 +0300
Committer: agura <[email protected]>
Committed: Tue Apr 14 02:18:22 2015 +0300

----------------------------------------------------------------------
 .../streaming/socket/SocketStreamerExample.java | 156 ++++++++++++++
 .../socket/ZStringsSocketStreamerExample.java   | 149 +++++++++++++
 .../examples/streaming/socket/package-info.java |  21 ++
 .../internal/util/nio/GridDelimitedParser.java  |  91 ++++++++
 .../util/nio/GridNioDelimitedBuffer.java        |  98 +++++++++
 .../stream/socket/IgniteSocketStreamer.java     | 214 +++++++++++++++++++
 .../stream/socket/SocketMessageConverter.java   |  32 +++
 .../ignite/stream/socket/package-info.java      |  21 ++
 8 files changed, 782 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
new file mode 100644
index 0000000..1ee916f
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.adapters.*;
+import org.apache.ignite.stream.socket.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Streams random numbers into the streaming cache using {@link 
IgniteSocketStreamer}.
+ * To start the example, you should:
+ * <ul>
+ *      <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *      <li>Start streaming using {@link SocketStreamerExample}.</li>
+ *      <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class SocketStreamerExample {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
+
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws InterruptedException, 
IOException {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
+
+            try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
+
+                // Configure data transformation to count instances of the 
same word.
+                stmr.receiver(new StreamTransformer<Integer, Long>() {
+                    @Override public Object process(MutableEntry<Integer, 
Long> e, Object... objects)
+                        throws EntryProcessorException {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }
+                });
+
+                InetAddress addr = InetAddress.getLocalHost();
+
+                IgniteSocketStreamer<Tuple, Integer, Long> sockStmr = new 
IgniteSocketStreamer<>();
+
+                sockStmr.setAddr(addr);
+
+                sockStmr.setPort(PORT);
+
+                sockStmr.setStreamer(stmr);
+
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, 
Integer, Long>() {
+                    @Override public Map.Entry<Integer, Long> extract(Tuple 
tuple) {
+                        return new IgniteBiTuple<>(tuple.key, tuple.cnt);
+                    }
+                });
+
+                sockStmr.start();
+
+                sendData(addr, PORT);
+            }
+        }
+    }
+
+    /**
+     * @param addr Address.
+     * @param port Port.
+     */
+    private static void sendData(InetAddress addr, int port) throws 
IOException, InterruptedException {
+        try (Socket sock = new Socket(addr, port);
+             OutputStream oos = new 
BufferedOutputStream(sock.getOutputStream())) {
+            while (true) {
+                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                     ObjectOutputStream out = new ObjectOutputStream(bos)) {
+                    Tuple tuple = new Tuple(RAND.nextInt(RANGE), 1L);
+
+                    out.writeObject(tuple);
+
+                    byte[] arr = bos.toByteArray();
+
+                    oos.write(arr.length >>> 24);
+                    oos.write(arr.length >>> 16);
+                    oos.write(arr.length >>> 8);
+                    oos.write(arr.length);
+
+                    oos.write(arr);
+                }
+            }
+        }
+    }
+
+    /**
+     * Tuple.
+     */
+    private static class Tuple implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0;
+
+        /** Key. */
+        private final int key;
+
+        /** Count. */
+        private final long cnt;
+
+        /**
+         * @param key Key.
+         * @param cnt Count.
+         */
+        public Tuple(int key, long cnt) {
+            this.key = key;
+            this.cnt = cnt;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
new file mode 100644
index 0000000..9fd229e
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.adapters.*;
+import org.apache.ignite.stream.socket.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache using {@link 
IgniteSocketStreamer}.
+ * <p>
+ * Example illustrates usage of TCP socket streamer in case of non-Java 
clients. In this example client streams
+ * zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ *      <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *      <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
+ *      <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class ZStringsSocketStreamerExample {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
+
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /** Delimiter. */
+    private static final byte[] DELIM = new byte[] {0};
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws InterruptedException, 
IOException {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
+
+            try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
+
+                // Configure data transformation to count instances of the 
same word.
+                stmr.receiver(new StreamTransformer<Integer, Long>() {
+                    @Override public Object process(MutableEntry<Integer, 
Long> e, Object... objects)
+                        throws EntryProcessorException {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }
+                });
+
+                InetAddress addr = InetAddress.getLocalHost();
+
+                IgniteSocketStreamer<String, Integer, Long> sockStmr = new 
IgniteSocketStreamer<>();
+
+                sockStmr.setAddr(addr);
+
+                sockStmr.setPort(PORT);
+
+                sockStmr.setDelimiter(DELIM);
+
+                sockStmr.setStreamer(stmr);
+
+                // Converter from zero-terminated string to Java strings.
+                sockStmr.setConverter(new SocketMessageConverter<String>() {
+                    @Override public String convert(byte[] msg) {
+                        try {
+                            return new String(msg, "ASCII");
+                        }
+                        catch (UnsupportedEncodingException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                });
+
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<String, 
Integer, Long>() {
+                    @Override public Map.Entry<Integer, Long> extract(String 
input) {
+                        String[] pair = input.split("=");
+                        return new IgniteBiTuple<>(Integer.parseInt(pair[0]), 
Long.parseLong(pair[1]));
+                    }
+                });
+
+                sockStmr.start();
+
+                sendData(addr, PORT);
+            }
+        }
+    }
+
+    /**
+     * @param addr Address.
+     * @param port Port.
+     */
+    private static void sendData(InetAddress addr, int port) throws 
IOException, InterruptedException {
+        try (Socket sock = new Socket(addr, port);
+             OutputStream oos = new 
BufferedOutputStream(sock.getOutputStream())) {
+
+            while (true) {
+                int key = RAND.nextInt(RANGE);
+
+                String str = key + "=1";
+
+                byte[] arr = str.getBytes("ASCII");
+
+                oos.write(arr);
+                oos.write(DELIM);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
new file mode 100644
index 0000000..ae7bdf9
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage 
examples.
+ */
+package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
new file mode 100644
index 0000000..d49ddd4
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * This class implements stream parser based on {@link GridNioDelimitedBuffer}.
+ * <p>
+ * The rule for this parser is that every message sent over the stream is 
appended with
+ * delimiter (bytes array). So, the stream structure is as follows:
+ * <pre>
+ *     +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ *     |   MESSAGE  | DELIMITER  |  MESSAGE  | DELIMITER  |
+ *     +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ * </pre>
+ */
+public class GridDelimitedParser implements GridNioParser {
+    /** Buffer metadata key. */
+    private static final int BUF_META_KEY = 
GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Delimiter. */
+    private final byte[] delim;
+
+    /** Direct buffer. */
+    private final boolean directBuf;
+
+    /**
+     * @param delim Delimiter.
+     * @param directBuf Direct buffer.
+     */
+    public GridDelimitedParser(byte[] delim, boolean directBuf) {
+        this.delim = delim;
+        this.directBuf = directBuf;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws 
IOException, IgniteCheckedException {
+        GridNioDelimitedBuffer nioBuf = ses.meta(BUF_META_KEY);
+
+        // Decode for a given session is called per one thread, so there 
should not be any concurrency issues.
+        // However, we make some additional checks.
+        if (nioBuf == null) {
+            nioBuf = new GridNioDelimitedBuffer(delim);
+
+            GridNioDelimitedBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
+
+            assert old == null;
+        }
+
+        return nioBuf.read(buf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws 
IOException, IgniteCheckedException {
+        byte[] msg0 = (byte[])msg;
+
+        int capacity = msg0.length + delim.length;
+        ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(capacity) : 
ByteBuffer.allocate(capacity);
+
+        res.put(msg0);
+        res.put(delim);
+
+        res.flip();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return this.getClass().getSimpleName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
new file mode 100644
index 0000000..5f895ee
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Buffer with message delimiter support.
+ */
+public class GridNioDelimitedBuffer {
+    /** Buffer size. */
+    private static final int BUFFER_SIZE = 512;
+
+    /** Delimiter. */
+    private final byte[] delim;
+
+    /** Data. */
+    private byte[] data;
+
+    /** Count. */
+    private int cnt;
+
+    /**
+     * @param delim Delimiter.
+     */
+    public GridNioDelimitedBuffer(byte[] delim) {
+        assert delim != null;
+        assert delim.length > 0;
+
+        this.delim = delim;
+
+        reset();
+    }
+
+    private void reset() {
+        cnt = 0;
+
+        data = new byte[BUFFER_SIZE];
+    }
+
+    /**
+     * @param buf Buffer.
+     * @return Message bytes or {@code null} if message is not fully read yet.
+     */
+    @Nullable public byte[] read(ByteBuffer buf) {
+        for(; buf.hasRemaining();) {
+
+            if (cnt == data.length)
+                data = Arrays.copyOf(data, data.length * 2);
+
+            data[cnt++] = buf.get();
+
+            if (cnt >= delim.length && found()) {
+                byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length);
+
+                reset();
+
+                return bytes;
+            }
+        }
+
+        return null;
+    }
+
+    /**
+     * Tries find delimiter in buffer.
+     *
+     * @return {@code True} if delimiter found, {@code false} - otherwise.
+     */
+    private boolean found() {
+        int from = cnt - delim.length;
+
+        for (int i = 0; i < delim.length ; i++) {
+            if (delim[i] != data[from + i])
+                return false;
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
new file mode 100644
index 0000000..43754ff
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.logger.java.*;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.ignite.stream.adapters.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Server that receives data from TCP socket, converts it to key-value pairs 
and streams into
+ * {@link IgniteDataStreamer} instance.
+ */
+public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+    /** Default port. */
+    private static final int DFLT_PORT = 5555;
+
+    /** Default threads. */
+    private static final int DFLT_THREADS = 
Runtime.getRuntime().availableProcessors();
+
+    /** Logger. */
+    private final IgniteLogger log = new JavaLogger(false) {
+        @Override public boolean isDebugEnabled() {
+            return true;
+        }
+    };
+
+    /** Address. */
+    private InetAddress addr;
+
+    /** Server port. */
+    private int port = DFLT_PORT;
+
+    /** Threads number. */
+    private int threads = DFLT_THREADS;
+
+    /** Direct mode. */
+    private boolean directMode;
+
+    /** Delimiter. */
+    private byte[] delim;
+
+    /** Converter. */
+    private SocketMessageConverter<T> converter;
+
+    /** Server. */
+    private GridNioServer<byte[]> srv;
+
+    /**
+     * Sets server address.
+     *
+     * @param addr Address.
+     */
+    public void setAddr(InetAddress addr) {
+        this.addr = addr;
+    }
+
+    /**
+     * Sets port number.
+     *
+     * @param port Port.
+     */
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    /**
+     * Sets threadds amount.
+     *
+     * @param threads Threads.
+     */
+    public void setThreads(int threads) {
+        this.threads = threads;
+    }
+
+    /**
+     * Sets direct mode flag.
+     *
+     * @param directMode Direct mode.
+     */
+    public void setDirectMode(boolean directMode) {
+        this.directMode = directMode;
+    }
+
+    /**
+     * Sets message delimiter.
+     *
+     * @param delim Delimiter.
+     */
+    public void setDelimiter(byte[] delim) {
+        this.delim = delim;
+    }
+
+    /**
+     * Sets message converter.
+     *
+     * @param converter Converter.
+     */
+    public void setConverter(SocketMessageConverter<T> converter) {
+        this.converter = converter;
+    }
+
+    /**
+     * Starts streamer.
+     *
+     * @throws IgniteException If failed.
+     */
+    public void start() {
+        GridNioServerListener<byte[]> lsnr = new 
GridNioServerListenerAdapter<byte[]>() {
+            @Override public void onConnected(GridNioSession ses) {
+                assert ses.accepted();
+
+                if (log.isDebugEnabled())
+                    log.debug("Accepted connection: " + ses.remoteAddress());
+            }
+
+            @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
+                if (e != null)
+                    log.error("Connection failed with exception", e);
+            }
+
+            @Override public void onMessage(GridNioSession ses, byte[] msg) {
+                T obj = converter.convert(msg);
+
+                Map.Entry<K, V> e = getTupleExtractor().extract(obj);
+
+                getStreamer().addData(e);
+            }
+        };
+
+        ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
+
+        GridNioParser parser = F.isEmpty(delim) ? new 
GridBufferedParser(directMode, byteOrder) :
+            new GridDelimitedParser(delim, directMode);
+
+        if (converter == null)
+            converter = new DefaultConverter<>();
+
+        GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
+
+        GridNioFilter[] filters = new GridNioFilter[] {codec};
+
+        try {
+            srv = new GridNioServer.Builder<byte[]>()
+                .address(addr == null ? InetAddress.getLocalHost() : addr)
+                .port(port)
+                .listener(lsnr)
+                .logger(log)
+                .selectorCount(threads)
+                .byteOrder(byteOrder)
+                .filters(filters)
+                .build();
+        }
+        catch (IgniteCheckedException | UnknownHostException e) {
+            throw new IgniteException(e);
+        }
+
+        srv.start();
+
+        if (log.isDebugEnabled())
+            log.debug("Socket streaming server started on " + addr + ':' + 
port);
+    }
+
+    /**
+     * Stops streamer.
+     */
+    public void stop() {
+        srv.stop();
+
+        if (log.isDebugEnabled())
+            log.debug("Socket streaming server stopped");
+    }
+
+    /**
+     * Converts message to Java object using Jdk marshaller.
+     */
+    private static class DefaultConverter<T> implements 
SocketMessageConverter<T> {
+        /** Marshaller. */
+        private static final JdkMarshaller MARSH = new JdkMarshaller();
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public T convert(byte[] msg) {
+            try {
+                return MARSH.unmarshal(msg, null);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
new file mode 100644
index 0000000..0a70a33
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+/**
+ * Socket message converter.
+ */
+public interface SocketMessageConverter<T> {
+
+    /**
+     * Converter message represented by array of bytes to object.
+     *
+     * @param msg Message.
+     * @return Converted object.
+     */
+    public T convert(byte[] msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
new file mode 100644
index 0000000..e1cef65
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains socket streamer implementation.
+ */
+package org.apache.ignite.stream.socket;
\ No newline at end of file

Reply via email to