Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-430-1 ff30e7a94 -> fd401bfb9


ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fd401bfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fd401bfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fd401bfb

Branch: refs/heads/ignite-430-1
Commit: fd401bfb99c7a8fd8b5c297f8eb15e330573366b
Parents: ff30e7a
Author: agura <[email protected]>
Authored: Mon Apr 13 22:52:15 2015 +0300
Committer: agura <[email protected]>
Committed: Mon Apr 13 22:52:15 2015 +0300

----------------------------------------------------------------------
 .../ZeroTerminatedSocketStreamerExample.java    | 144 +++++++++++++++++++
 .../internal/util/nio/GridDelimitedParser.java  |  90 ++++++++++++
 .../util/nio/GridNioDelimitedBuffer.java        | 101 +++++++++++++
 .../stream/socket/IgniteSocketStreamer.java     |  18 ++-
 .../util/nio/GridNioDelimitedBufferTest.java    |  25 ++++
 5 files changed, 376 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd401bfb/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZeroTerminatedSocketStreamerExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZeroTerminatedSocketStreamerExample.java
 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZeroTerminatedSocketStreamerExample.java
new file mode 100644
index 0000000..a505f40
--- /dev/null
+++ 
b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZeroTerminatedSocketStreamerExample.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.adapters.*;
+import org.apache.ignite.stream.socket.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache using {@link 
IgniteSocketStreamer}. To start the example, you should:
+ * <ul>
+ *      <li>Start a few nodes using {@link ExampleNodeStartup} or by starting 
remote nodes as specified below.</li>
+ *      <li>Start streaming using {@link SocketStreamerExample}.</li>
+ *      <li>Start querying popular numbers using {@link 
QueryPopularNumbers}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in 
another JVM.
+ */
+public class ZeroTerminatedSocketStreamerExample {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
+
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /** Delimiter. */
+    private static final byte[] DELIM = new byte[] {0};
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws InterruptedException, 
IOException {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = 
Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of 
the streaming data.
+            IgniteCache<Integer, Long> stmCache = 
ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
+
+            try (IgniteDataStreamer<Integer, Long> stmr = 
ignite.dataStreamer(stmCache.getName())) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
+
+                // Configure data transformation to count instances of the 
same word.
+                stmr.receiver(new StreamTransformer<Integer, Long>() {
+                    @Override public Object process(MutableEntry<Integer, 
Long> e, Object... objects)
+                        throws EntryProcessorException {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }
+                });
+
+                InetAddress addr = InetAddress.getLocalHost();
+
+                IgniteSocketStreamer<String, Integer, Long> sockStmr =
+                    new IgniteSocketStreamer<String, Integer, Long>() {
+                        @Override protected String convertMessage(byte[] arr) 
throws IgniteCheckedException {
+                            try {
+                                String str = new String(arr, "ASCII");
+                                return str;
+                            }
+                            catch (UnsupportedEncodingException e) {
+                                e.printStackTrace();
+                                return null;
+                            }
+                        }
+                    };
+
+                sockStmr.setAddr(addr);
+
+                sockStmr.setPort(PORT);
+
+                sockStmr.setDelimiter(DELIM);
+
+                sockStmr.setStreamer(stmr);
+
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<String, 
Integer, Long>() {
+                    @Override public Map.Entry<Integer, Long> extract(String 
input) {
+                        String[] pair = input.split("=");
+                        return new IgniteBiTuple<>(Integer.parseInt(pair[0]), 
Long.parseLong(pair[1]));
+                    }
+                });
+
+                sockStmr.start();
+
+                sendData(addr, PORT);
+            }
+        }
+    }
+
+    /**
+     * @param addr Address.
+     * @param port Port.
+     */
+    private static void sendData(InetAddress addr, int port) throws 
IOException, InterruptedException {
+        try (Socket sock = new Socket(addr, port);
+             OutputStream oos = new 
BufferedOutputStream(sock.getOutputStream())) {
+
+            while (true) {
+                int key = RAND.nextInt(RANGE);
+
+                String str = key + "=1";
+
+                byte[] arr = str.getBytes("ASCII");
+
+                oos.write(arr);
+                oos.write(DELIM);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd401bfb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
new file mode 100644
index 0000000..9b7d892
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * This class implements stream parser based on {@link GridNioServerBuffer}.
+ * <p>
+ * The rule for this parser is that every message sent over the stream is 
prepended with
+ * 4-byte integer header containing message size. So, the stream structure is 
as follows:
+ * <pre>
+ *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
+ *     | MSG_SIZE  |   MESSAGE  | MSG_SIZE  |   MESSAGE  |
+ *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
+ * </pre>
+ * <p>
+ * It expects that first 4 bytes in stream are {@link U#IGNITE_HEADER}. If 
beginning of a stream,
+ * isn't equal to these bytes than exception will be thrown.
+ */
+public class GridDelimitedParser implements GridNioParser {
+    /** Buffer metadata key. */
+    private static final int BUF_META_KEY = 
GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Delimiter. */
+    private final byte[] delim;
+
+    /** Direct buffer. */
+    private final boolean directBuf;
+
+    /**
+     * @param delim Delimiter.
+     * @param directBuf Direct buffer.
+     */
+    public GridDelimitedParser(byte[] delim, boolean directBuf) {
+        this.delim = delim;
+        this.directBuf = directBuf;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws 
IOException, IgniteCheckedException {
+        GridNioDelimitedBuffer nioBuf = ses.meta(BUF_META_KEY);
+
+        // Decode for a given session is called per one thread, so there 
should not be any concurrency issues.
+        // However, we make some additional checks.
+        if (nioBuf == null) {
+            nioBuf = new GridNioDelimitedBuffer(delim);
+
+            GridNioDelimitedBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
+
+            assert old == null;
+        }
+
+        return nioBuf.read(buf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws 
IOException, IgniteCheckedException {
+        byte[] msg0 = (byte[])msg;
+
+        ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(msg0.length + 
delim.length) : ByteBuffer.allocate(msg0.length + delim.length);
+
+        res.put(msg0);
+        res.put(delim);
+
+        res.flip();
+
+        return res;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd401bfb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
new file mode 100644
index 0000000..0724f99
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.*;
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+public class GridNioDelimitedBuffer {
+    /** Buffer size. */
+    private static final int BUFFER_SIZE = 512;
+
+    /** Delimiter. */
+    private final byte[] delim;
+
+    /** Data. */
+    private byte[] data;
+
+    /** Count. */
+    private int cnt;
+
+    /** Position. */
+    private int pos;
+
+    /**
+     * @param delim Delimiter.
+     */
+    public GridNioDelimitedBuffer(byte[] delim) {
+        assert delim != null;
+        assert delim.length > 0;
+
+        this.delim = delim;
+
+        reset();
+    }
+
+    public void reset() {
+        cnt = pos = 0;
+
+        data = new byte[BUFFER_SIZE];
+    }
+
+    @Nullable public byte[] read(ByteBuffer buf) throws IgniteCheckedException 
{
+        for(; buf.hasRemaining();) {
+
+            if (cnt == data.length)
+                data = Arrays.copyOf(data, data.length * 2);
+
+            data[cnt++] = buf.get();
+
+            if (cnt >= delim.length && found()) {
+                byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length);
+
+                reset();
+
+                return bytes;
+            }
+        }
+
+        return null;
+    }
+
+    private boolean found() {
+        for (int i = pos; i <= cnt - delim.length; i++) {
+            int matchedCnt = 0;
+
+            for (int j = 0; j < delim.length; j++) {
+                int idx = i + j;
+
+                if (data[idx] == delim[j]) {
+                    matchedCnt++;
+
+                    pos = idx;
+                }
+            }
+
+
+            if (matchedCnt == delim.length)
+                return true;
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd401bfb/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
index 18f1748..05281e9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
@@ -19,6 +19,8 @@ package org.apache.ignite.stream.socket;
 
 import org.apache.ignite.*;
 import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.logger.java.*;
 import org.apache.ignite.stream.adapters.*;
 import org.jetbrains.annotations.*;
@@ -58,6 +60,9 @@ public class IgniteSocketStreamer<T, K, V> extends 
StreamAdapter<T, K, V> {
     /** Direct mode. */
     private boolean directMode;
 
+    /** Delimiter. */
+    private byte[] delim;
+
     /** Server. */
     private GridNioServer<byte[]> srv;
 
@@ -98,6 +103,13 @@ public class IgniteSocketStreamer<T, K, V> extends 
StreamAdapter<T, K, V> {
     }
 
     /**
+     * @param delim Delimiter.
+     */
+    public void setDelimiter(byte[] delim) {
+        this.delim = delim;
+    }
+
+    /**
      * Starts streamer.
      *
      * @throws IgniteException If failed.
@@ -133,7 +145,8 @@ public class IgniteSocketStreamer<T, K, V> extends 
StreamAdapter<T, K, V> {
 
         ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
 
-        GridNioParser parser = new GridBufferedParser(directMode, byteOrder);
+        GridNioParser parser = F.isEmpty(delim) ? new 
GridBufferedParser(directMode, byteOrder) :
+            new GridDelimitedParser(delim, directMode);
 
         GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
 
@@ -167,7 +180,8 @@ public class IgniteSocketStreamer<T, K, V> extends 
StreamAdapter<T, K, V> {
      */
     @SuppressWarnings("unchecked")
     protected T convertMessage(byte[] arr) throws IgniteCheckedException {
-        try (ObjectInputStream is = new ObjectInputStream(new 
ByteArrayInputStream(arr))) {
+        try (ByteArrayInputStream in = new ByteArrayInputStream(arr);
+             ObjectInputStream is = new ObjectInputStream(in)) {
             return (T)is.readObject();
         }
         catch (IOException | ClassNotFoundException e) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd401bfb/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java
new file mode 100644
index 0000000..b02f485
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java
@@ -0,0 +1,25 @@
+package org.apache.ignite.internal.util.nio;
+
+import junit.framework.TestCase;
+
+import java.nio.*;
+
+public class GridNioDelimitedBufferTest extends TestCase {
+
+    public void testRead() throws Exception {
+        byte[] delim = new byte[] {0, 1, 2};
+
+        byte[] arr = new byte[] {'p', 'r', 'b', 0, 1, 2, 'h', 'e', 'l', 'l', 
'o', 0, 1, 2};
+
+        ByteBuffer bf = ByteBuffer.wrap(arr);
+
+        GridNioDelimitedBuffer buf = new GridNioDelimitedBuffer(delim);
+
+        byte[] read;
+        while ((read = buf.read(bf)) != null) {
+            System.out.println("Lengh: " + read.length);
+            String str = new String(read);
+            System.out.println(str);
+        }
+    }
+}
\ No newline at end of file

Reply via email to