Repository: incubator-ignite Updated Branches: refs/heads/ignite-430-1 [created] ff30e7a94
ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ff30e7a9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ff30e7a9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ff30e7a9 Branch: refs/heads/ignite-430-1 Commit: ff30e7a941484d55886e25c52ae111c2cf022c28 Parents: 742bd4a Author: agura <[email protected]> Authored: Mon Apr 13 18:28:40 2015 +0300 Committer: agura <[email protected]> Committed: Mon Apr 13 19:06:26 2015 +0300 ---------------------------------------------------------------------- .../streaming/socket/SocketStreamerExample.java | 155 +++++++++++++++ .../examples/streaming/socket/package-info.java | 21 +++ .../stream/socket/IgniteSocketStreamer.java | 187 +++++++++++++++++++ .../ignite/stream/socket/package-info.java | 21 +++ 4 files changed, 384 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff30e7a9/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java new file mode 100644 index 0000000..abaf2c5 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.socket; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.numbers.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.stream.adapters.*; +import org.apache.ignite.stream.socket.*; + +import javax.cache.processor.*; +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Stream random numbers into the streaming cache using {@link IgniteSocketStreamer}. To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link SocketStreamerExample}.</li> + * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class SocketStreamerExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + /** Port. */ + private static final int PORT = 5555; + + /** + * @param args Args. + */ + public static void main(String[] args) throws InterruptedException, IOException { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); + + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer<Integer, Long>() { + @Override public Object process(MutableEntry<Integer, Long> e, Object... objects) + throws EntryProcessorException { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + + InetAddress addr = InetAddress.getLocalHost(); + + IgniteSocketStreamer<Tuple, Integer, Long> sockStmr = new IgniteSocketStreamer<>(); + + sockStmr.setAddr(addr); + + sockStmr.setPort(PORT); + + sockStmr.setStreamer(stmr); + + sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, Long>() { + @Override public Map.Entry<Integer, Long> extract(Tuple tuple) { + return new IgniteBiTuple<>(tuple.key, tuple.cnt); + } + }); + + sockStmr.start(); + + sendData(addr, PORT); + } + } + } + + /** + * @param addr Address. + * @param port Port. + */ + private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException { + try (Socket sock = new Socket(addr, port); + OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) { + while (true) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos)) { + Tuple tup = new Tuple(RAND.nextInt(RANGE), 1L); + + out.writeObject(tup); + + byte[] arr = bos.toByteArray(); + + oos.write(arr.length >>> 24); + oos.write(arr.length >>> 16); + oos.write(arr.length >>> 8); + oos.write(arr.length); + + oos.write(arr); + } + } + } + } + + /** + * Tuple. + */ + private static class Tuple implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0; + + /** Key. */ + private final int key; + + /** Count. */ + private final long cnt; + + /** + * @param key Key. + * @param cnt Count. + */ + public Tuple(int key, long cnt) { + this.key = key; + this.cnt = cnt; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff30e7a9/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java new file mode 100644 index 0000000..ae7bdf9 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples. + */ +package org.apache.ignite.examples.streaming.socket; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff30e7a9/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java new file mode 100644 index 0000000..18f1748 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.logger.java.*; +import org.apache.ignite.stream.adapters.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.nio.*; +import java.util.*; + +/** + * TCP socket server that receives data from network, converts it to key-value pairs and streams into + * {@link IgniteDataStreamer} instance. + */ +public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { + /** Default port. */ + private static final int DFLT_PORT = 5555; + + /** Default threads. */ + private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors(); + + /** Logger. */ + private final IgniteLogger log = new JavaLogger(false) { + @Override public boolean isDebugEnabled() { + return true; + } + }; + + /** Address. */ + private InetAddress addr; + + /** Server port. */ + private int port = DFLT_PORT; + + /** Threads number. */ + private int threads = DFLT_THREADS; + + /** Direct mode. */ + private boolean directMode; + + /** Server. */ + private GridNioServer<byte[]> srv; + + /** + * Sets server address. + * + * @param addr Address. + */ + public void setAddr(InetAddress addr) { + this.addr = addr; + } + + /** + * Sets port number. + * + * @param port Port. + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Sets threadds amount. + * + * @param threads Threads. + */ + public void setThreads(int threads) { + this.threads = threads; + } + + /** + * Sets direct mode flag. + * + * @param directMode Direct mode. + */ + public void setDirectMode(boolean directMode) { + this.directMode = directMode; + } + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() { + GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() { + @Override public void onConnected(GridNioSession ses) { + assert ses.accepted(); + + if (log.isDebugEnabled()) + log.debug("Accepted connection: " + ses.remoteAddress()); + } + + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + if (e != null) + log.error("Connection failed with exception", e); + } + + @SuppressWarnings("unchecked") + @Override public void onMessage(GridNioSession ses, byte[] arr) { + try { + T obj = convertMessage(arr); + + Map.Entry<K, V> e = getTupleExtractor().extract(obj); + + getStreamer().addData(e); + } + catch (IgniteCheckedException e) { + log.error("Can't deserialize message.", e); + } + } + }; + + ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + + GridNioParser parser = new GridBufferedParser(directMode, byteOrder); + + GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode); + + GridNioFilter[] filters = new GridNioFilter[] {codec}; + + try { + srv = new GridNioServer.Builder<byte[]>() + .address(addr == null ? InetAddress.getLocalHost() : addr) + .port(port) + .listener(lsnr) + .logger(log) + .selectorCount(threads) + .byteOrder(byteOrder) + .filters(filters) + .build(); + } + catch (IgniteCheckedException | UnknownHostException e) { + throw new IgniteException(e); + } + + srv.start(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server started on " + addr + ':' + port); + } + + /** + * Converts array of bytes to object. + * + * @param arr Array. + */ + @SuppressWarnings("unchecked") + protected T convertMessage(byte[] arr) throws IgniteCheckedException { + try (ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(arr))) { + return (T)is.readObject(); + } + catch (IOException | ClassNotFoundException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Stops streamer. + */ + public void stop() { + srv.stop(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server stopped"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff30e7a9/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java new file mode 100644 index 0000000..e1cef65 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains socket streamer implementation. + */ +package org.apache.ignite.stream.socket; \ No newline at end of file
