Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-430-1 [created] ff30e7a94


ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ff30e7a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ff30e7a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ff30e7a9

Branch: refs/heads/ignite-430-1
Commit: ff30e7a941484d55886e25c52ae111c2cf022c28
Parents: 742bd4a
Author: agura <[email protected]>
Authored: Mon Apr 13 18:28:40 2015 +0300
Committer: agura <[email protected]>
Committed: Mon Apr 13 19:06:26 2015 +0300

----------------------------------------------------------------------
 .../streaming/socket/SocketStreamerExample.java | 155 +++++++++++++++
 .../examples/streaming/socket/package-info.java |  21 +++
 .../stream/socket/IgniteSocketStreamer.java     | 187 +++++++++++++++++++
 .../ignite/stream/socket/package-info.java      |  21 +++
 4 files changed, 384 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff30e7a9/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
new file mode 100644
index 0000000..abaf2c5
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.adapters.*;
+import org.apache.ignite.stream.socket.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache using {@link 
IgniteSocketStreamer}. To start the example, you should:
+ * <ul>
+ *      <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *      <li>Start streaming using {@link SocketStreamerExample}.</li>
+ *      <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class SocketStreamerExample {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
+
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws InterruptedException, 
IOException {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
+
+            try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
+
+                // Configure data transformation to count instances of the 
same word.
+                stmr.receiver(new StreamTransformer<Integer, Long>() {
+                    @Override public Object process(MutableEntry<Integer, 
Long> e, Object... objects)
+                        throws EntryProcessorException {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }
+                });
+
+                InetAddress addr = InetAddress.getLocalHost();
+
+                IgniteSocketStreamer<Tuple, Integer, Long> sockStmr = new 
IgniteSocketStreamer<>();
+
+                sockStmr.setAddr(addr);
+
+                sockStmr.setPort(PORT);
+
+                sockStmr.setStreamer(stmr);
+
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, 
Integer, Long>() {
+                    @Override public Map.Entry<Integer, Long> extract(Tuple 
tuple) {
+                        return new IgniteBiTuple<>(tuple.key, tuple.cnt);
+                    }
+                });
+
+                sockStmr.start();
+
+                sendData(addr, PORT);
+            }
+        }
+    }
+
+    /**
+     * @param addr Address.
+     * @param port Port.
+     */
+    private static void sendData(InetAddress addr, int port) throws 
IOException, InterruptedException {
+        try (Socket sock = new Socket(addr, port);
+             OutputStream oos = new 
BufferedOutputStream(sock.getOutputStream())) {
+            while (true) {
+                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                     ObjectOutputStream out = new ObjectOutputStream(bos)) {
+                    Tuple tup = new Tuple(RAND.nextInt(RANGE), 1L);
+
+                    out.writeObject(tup);
+
+                    byte[] arr = bos.toByteArray();
+
+                    oos.write(arr.length >>> 24);
+                    oos.write(arr.length >>> 16);
+                    oos.write(arr.length >>> 8);
+                    oos.write(arr.length);
+
+                    oos.write(arr);
+                }
+            }
+        }
+    }
+
+    /**
+     * Tuple.
+     */
+    private static class Tuple implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0;
+
+        /** Key. */
+        private final int key;
+
+        /** Count. */
+        private final long cnt;
+
+        /**
+         * @param key Key.
+         * @param cnt Count.
+         */
+        public Tuple(int key, long cnt) {
+            this.key = key;
+            this.cnt = cnt;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff30e7a9/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
new file mode 100644
index 0000000..ae7bdf9
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage 
examples.
+ */
+package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff30e7a9/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
new file mode 100644
index 0000000..18f1748
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.logger.java.*;
+import org.apache.ignite.stream.adapters.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.nio.*;
+import java.util.*;
+
+/**
+ * TCP socket server that receives data from network, converts it to key-value 
pairs and streams into
+ * {@link IgniteDataStreamer} instance.
+ */
+public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+    /** Default port. */
+    private static final int DFLT_PORT = 5555;
+
+    /** Default threads. */
+    private static final int DFLT_THREADS = 
Runtime.getRuntime().availableProcessors();
+
+    /** Logger. */
+    private final IgniteLogger log = new JavaLogger(false) {
+        @Override public boolean isDebugEnabled() {
+            return true;
+        }
+    };
+
+    /** Address. */
+    private InetAddress addr;
+
+    /** Server port. */
+    private int port = DFLT_PORT;
+
+    /** Threads number. */
+    private int threads = DFLT_THREADS;
+
+    /** Direct mode. */
+    private boolean directMode;
+
+    /** Server. */
+    private GridNioServer<byte[]> srv;
+
+    /**
+     * Sets server address.
+     *
+     * @param addr Address.
+     */
+    public void setAddr(InetAddress addr) {
+        this.addr = addr;
+    }
+
+    /**
+     * Sets port number.
+     *
+     * @param port Port.
+     */
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    /**
+     * Sets threadds amount.
+     *
+     * @param threads Threads.
+     */
+    public void setThreads(int threads) {
+        this.threads = threads;
+    }
+
+    /**
+     * Sets direct mode flag.
+     *
+     * @param directMode Direct mode.
+     */
+    public void setDirectMode(boolean directMode) {
+        this.directMode = directMode;
+    }
+
+    /**
+     * Starts streamer.
+     *
+     * @throws IgniteException If failed.
+     */
+    public void start() {
+        GridNioServerListener<byte[]> lsnr = new 
GridNioServerListenerAdapter<byte[]>() {
+            @Override public void onConnected(GridNioSession ses) {
+                assert ses.accepted();
+
+                if (log.isDebugEnabled())
+                    log.debug("Accepted connection: " + ses.remoteAddress());
+            }
+
+            @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
+                if (e != null)
+                    log.error("Connection failed with exception", e);
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override public void onMessage(GridNioSession ses, byte[] arr) {
+                try {
+                    T obj = convertMessage(arr);
+
+                    Map.Entry<K, V> e = getTupleExtractor().extract(obj);
+
+                    getStreamer().addData(e);
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Can't deserialize message.", e);
+                }
+            }
+        };
+
+        ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
+
+        GridNioParser parser = new GridBufferedParser(directMode, byteOrder);
+
+        GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
+
+        GridNioFilter[] filters = new GridNioFilter[] {codec};
+
+        try {
+            srv = new GridNioServer.Builder<byte[]>()
+                .address(addr == null ? InetAddress.getLocalHost() : addr)
+                .port(port)
+                .listener(lsnr)
+                .logger(log)
+                .selectorCount(threads)
+                .byteOrder(byteOrder)
+                .filters(filters)
+                .build();
+        }
+        catch (IgniteCheckedException | UnknownHostException e) {
+            throw new IgniteException(e);
+        }
+
+        srv.start();
+
+        if (log.isDebugEnabled())
+            log.debug("Socket streaming server started on " + addr + ':' + 
port);
+    }
+
+    /**
+     * Converts array of bytes to object.
+     *
+     * @param arr Array.
+     */
+    @SuppressWarnings("unchecked")
+    protected T convertMessage(byte[] arr) throws IgniteCheckedException {
+        try (ObjectInputStream is = new ObjectInputStream(new 
ByteArrayInputStream(arr))) {
+            return (T)is.readObject();
+        }
+        catch (IOException | ClassNotFoundException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Stops streamer.
+     */
+    public void stop() {
+        srv.stop();
+
+        if (log.isDebugEnabled())
+            log.debug("Socket streaming server stopped");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff30e7a9/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
new file mode 100644
index 0000000..e1cef65
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains socket streamer implementation.
+ */
+package org.apache.ignite.stream.socket;
\ No newline at end of file

Reply via email to