Repository: incubator-ignite Updated Branches: refs/heads/ignite-430-1 ff30e7a94 -> fd401bfb9
ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fd401bfb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fd401bfb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fd401bfb Branch: refs/heads/ignite-430-1 Commit: fd401bfb99c7a8fd8b5c297f8eb15e330573366b Parents: ff30e7a Author: agura <[email protected]> Authored: Mon Apr 13 22:52:15 2015 +0300 Committer: agura <[email protected]> Committed: Mon Apr 13 22:52:15 2015 +0300 ---------------------------------------------------------------------- .../ZeroTerminatedSocketStreamerExample.java | 144 +++++++++++++++++++ .../internal/util/nio/GridDelimitedParser.java | 90 ++++++++++++ .../util/nio/GridNioDelimitedBuffer.java | 101 +++++++++++++ .../stream/socket/IgniteSocketStreamer.java | 18 ++- .../util/nio/GridNioDelimitedBufferTest.java | 25 ++++ 5 files changed, 376 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd401bfb/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZeroTerminatedSocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZeroTerminatedSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZeroTerminatedSocketStreamerExample.java new file mode 100644 index 0000000..a505f40 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZeroTerminatedSocketStreamerExample.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.socket; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.numbers.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.stream.adapters.*; +import org.apache.ignite.stream.socket.*; + +import javax.cache.processor.*; +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Stream random numbers into the streaming cache using {@link IgniteSocketStreamer}. To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link SocketStreamerExample}.</li> + * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class ZeroTerminatedSocketStreamerExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + /** Port. */ + private static final int PORT = 5555; + + /** Delimiter. */ + private static final byte[] DELIM = new byte[] {0}; + + /** + * @param args Args. + */ + public static void main(String[] args) throws InterruptedException, IOException { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); + + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer<Integer, Long>() { + @Override public Object process(MutableEntry<Integer, Long> e, Object... objects) + throws EntryProcessorException { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + + InetAddress addr = InetAddress.getLocalHost(); + + IgniteSocketStreamer<String, Integer, Long> sockStmr = + new IgniteSocketStreamer<String, Integer, Long>() { + @Override protected String convertMessage(byte[] arr) throws IgniteCheckedException { + try { + String str = new String(arr, "ASCII"); + return str; + } + catch (UnsupportedEncodingException e) { + e.printStackTrace(); + return null; + } + } + }; + + sockStmr.setAddr(addr); + + sockStmr.setPort(PORT); + + sockStmr.setDelimiter(DELIM); + + sockStmr.setStreamer(stmr); + + sockStmr.setTupleExtractor(new StreamTupleExtractor<String, Integer, Long>() { + @Override public Map.Entry<Integer, Long> extract(String input) { + String[] pair = input.split("="); + return new IgniteBiTuple<>(Integer.parseInt(pair[0]), Long.parseLong(pair[1])); + } + }); + + sockStmr.start(); + + sendData(addr, PORT); + } + } + } + + /** + * @param addr Address. + * @param port Port. + */ + private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException { + try (Socket sock = new Socket(addr, port); + OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) { + + while (true) { + int key = RAND.nextInt(RANGE); + + String str = key + "=1"; + + byte[] arr = str.getBytes("ASCII"); + + oos.write(arr); + oos.write(DELIM); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd401bfb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java new file mode 100644 index 0000000..9b7d892 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.nio.*; + +/** + * This class implements stream parser based on {@link GridNioServerBuffer}. + * <p> + * The rule for this parser is that every message sent over the stream is prepended with + * 4-byte integer header containing message size. So, the stream structure is as follows: + * <pre> + * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+ + * | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE | + * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+ + * </pre> + * <p> + * It expects that first 4 bytes in stream are {@link U#IGNITE_HEADER}. If beginning of a stream, + * isn't equal to these bytes than exception will be thrown. + */ +public class GridDelimitedParser implements GridNioParser { + /** Buffer metadata key. */ + private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** Delimiter. */ + private final byte[] delim; + + /** Direct buffer. */ + private final boolean directBuf; + + /** + * @param delim Delimiter. + * @param directBuf Direct buffer. + */ + public GridDelimitedParser(byte[] delim, boolean directBuf) { + this.delim = delim; + this.directBuf = directBuf; + } + + /** {@inheritDoc} */ + @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { + GridNioDelimitedBuffer nioBuf = ses.meta(BUF_META_KEY); + + // Decode for a given session is called per one thread, so there should not be any concurrency issues. + // However, we make some additional checks. + if (nioBuf == null) { + nioBuf = new GridNioDelimitedBuffer(delim); + + GridNioDelimitedBuffer old = ses.addMeta(BUF_META_KEY, nioBuf); + + assert old == null; + } + + return nioBuf.read(buf); + } + + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { + byte[] msg0 = (byte[])msg; + + ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(msg0.length + delim.length) : ByteBuffer.allocate(msg0.length + delim.length); + + res.put(msg0); + res.put(delim); + + res.flip(); + + return res; + + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd401bfb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java new file mode 100644 index 0000000..0724f99 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +import java.nio.*; +import java.util.*; + +public class GridNioDelimitedBuffer { + /** Buffer size. */ + private static final int BUFFER_SIZE = 512; + + /** Delimiter. */ + private final byte[] delim; + + /** Data. */ + private byte[] data; + + /** Count. */ + private int cnt; + + /** Position. */ + private int pos; + + /** + * @param delim Delimiter. + */ + public GridNioDelimitedBuffer(byte[] delim) { + assert delim != null; + assert delim.length > 0; + + this.delim = delim; + + reset(); + } + + public void reset() { + cnt = pos = 0; + + data = new byte[BUFFER_SIZE]; + } + + @Nullable public byte[] read(ByteBuffer buf) throws IgniteCheckedException { + for(; buf.hasRemaining();) { + + if (cnt == data.length) + data = Arrays.copyOf(data, data.length * 2); + + data[cnt++] = buf.get(); + + if (cnt >= delim.length && found()) { + byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length); + + reset(); + + return bytes; + } + } + + return null; + } + + private boolean found() { + for (int i = pos; i <= cnt - delim.length; i++) { + int matchedCnt = 0; + + for (int j = 0; j < delim.length; j++) { + int idx = i + j; + + if (data[idx] == delim[j]) { + matchedCnt++; + + pos = idx; + } + } + + + if (matchedCnt == delim.length) + return true; + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd401bfb/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java index 18f1748..05281e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java @@ -19,6 +19,8 @@ package org.apache.ignite.stream.socket; import org.apache.ignite.*; import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.logger.java.*; import org.apache.ignite.stream.adapters.*; import org.jetbrains.annotations.*; @@ -58,6 +60,9 @@ public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { /** Direct mode. */ private boolean directMode; + /** Delimiter. */ + private byte[] delim; + /** Server. */ private GridNioServer<byte[]> srv; @@ -98,6 +103,13 @@ public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { } /** + * @param delim Delimiter. + */ + public void setDelimiter(byte[] delim) { + this.delim = delim; + } + + /** * Starts streamer. * * @throws IgniteException If failed. @@ -133,7 +145,8 @@ public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; - GridNioParser parser = new GridBufferedParser(directMode, byteOrder); + GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) : + new GridDelimitedParser(delim, directMode); GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode); @@ -167,7 +180,8 @@ public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { */ @SuppressWarnings("unchecked") protected T convertMessage(byte[] arr) throws IgniteCheckedException { - try (ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(arr))) { + try (ByteArrayInputStream in = new ByteArrayInputStream(arr); + ObjectInputStream is = new ObjectInputStream(in)) { return (T)is.readObject(); } catch (IOException | ClassNotFoundException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fd401bfb/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java new file mode 100644 index 0000000..b02f485 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java @@ -0,0 +1,25 @@ +package org.apache.ignite.internal.util.nio; + +import junit.framework.TestCase; + +import java.nio.*; + +public class GridNioDelimitedBufferTest extends TestCase { + + public void testRead() throws Exception { + byte[] delim = new byte[] {0, 1, 2}; + + byte[] arr = new byte[] {'p', 'r', 'b', 0, 1, 2, 'h', 'e', 'l', 'l', 'o', 0, 1, 2}; + + ByteBuffer bf = ByteBuffer.wrap(arr); + + GridNioDelimitedBuffer buf = new GridNioDelimitedBuffer(delim); + + byte[] read; + while ((read = buf.read(bf)) != null) { + System.out.println("Lengh: " + read.length); + String str = new String(read); + System.out.println(str); + } + } +} \ No newline at end of file
