Repository: incubator-ignite Updated Branches: refs/heads/ignite-430-1 fd401bfb9 -> 3954773b4 (forced update)
ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3954773b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3954773b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3954773b Branch: refs/heads/ignite-430-1 Commit: 3954773b4507855c2dd4bc0567ad95eefa30dba1 Parents: 742bd4a Author: agura <[email protected]> Authored: Mon Apr 13 18:28:40 2015 +0300 Committer: agura <[email protected]> Committed: Tue Apr 14 02:18:22 2015 +0300 ---------------------------------------------------------------------- .../streaming/socket/SocketStreamerExample.java | 156 ++++++++++++++ .../socket/ZStringsSocketStreamerExample.java | 149 +++++++++++++ .../examples/streaming/socket/package-info.java | 21 ++ .../internal/util/nio/GridDelimitedParser.java | 91 ++++++++ .../util/nio/GridNioDelimitedBuffer.java | 98 +++++++++ .../stream/socket/IgniteSocketStreamer.java | 214 +++++++++++++++++++ .../stream/socket/SocketMessageConverter.java | 32 +++ .../ignite/stream/socket/package-info.java | 21 ++ 8 files changed, 782 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java new file mode 100644 index 0000000..1ee916f --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.socket; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.numbers.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.stream.adapters.*; +import org.apache.ignite.stream.socket.*; + +import javax.cache.processor.*; +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Streams random numbers into the streaming cache using {@link IgniteSocketStreamer}. + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link SocketStreamerExample}.</li> + * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class SocketStreamerExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + /** Port. */ + private static final int PORT = 5555; + + /** + * @param args Args. + */ + public static void main(String[] args) throws InterruptedException, IOException { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); + + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer<Integer, Long>() { + @Override public Object process(MutableEntry<Integer, Long> e, Object... objects) + throws EntryProcessorException { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + + InetAddress addr = InetAddress.getLocalHost(); + + IgniteSocketStreamer<Tuple, Integer, Long> sockStmr = new IgniteSocketStreamer<>(); + + sockStmr.setAddr(addr); + + sockStmr.setPort(PORT); + + sockStmr.setStreamer(stmr); + + sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, Long>() { + @Override public Map.Entry<Integer, Long> extract(Tuple tuple) { + return new IgniteBiTuple<>(tuple.key, tuple.cnt); + } + }); + + sockStmr.start(); + + sendData(addr, PORT); + } + } + } + + /** + * @param addr Address. + * @param port Port. + */ + private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException { + try (Socket sock = new Socket(addr, port); + OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) { + while (true) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos)) { + Tuple tuple = new Tuple(RAND.nextInt(RANGE), 1L); + + out.writeObject(tuple); + + byte[] arr = bos.toByteArray(); + + oos.write(arr.length >>> 24); + oos.write(arr.length >>> 16); + oos.write(arr.length >>> 8); + oos.write(arr.length); + + oos.write(arr); + } + } + } + } + + /** + * Tuple. + */ + private static class Tuple implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0; + + /** Key. */ + private final int key; + + /** Count. */ + private final long cnt; + + /** + * @param key Key. + * @param cnt Count. + */ + public Tuple(int key, long cnt) { + this.key = key; + this.cnt = cnt; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java new file mode 100644 index 0000000..9fd229e --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.socket; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.numbers.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.stream.adapters.*; +import org.apache.ignite.stream.socket.*; + +import javax.cache.processor.*; +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Stream random numbers into the streaming cache using {@link IgniteSocketStreamer}. + * <p> + * Example illustrates usage of TCP socket streamer in case of non-Java clients. In this example client streams + * zero-terminated strings. + * <p> + * To start the example, you should: + * <ul> + * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li> + * <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li> + * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li> + * </ul> + * <p> + * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class ZStringsSocketStreamerExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + /** Port. */ + private static final int PORT = 5555; + + /** Delimiter. */ + private static final byte[] DELIM = new byte[] {0}; + + /** + * @param args Args. + */ + public static void main(String[] args) throws InterruptedException, IOException { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); + + try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer<Integer, Long>() { + @Override public Object process(MutableEntry<Integer, Long> e, Object... objects) + throws EntryProcessorException { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + + InetAddress addr = InetAddress.getLocalHost(); + + IgniteSocketStreamer<String, Integer, Long> sockStmr = new IgniteSocketStreamer<>(); + + sockStmr.setAddr(addr); + + sockStmr.setPort(PORT); + + sockStmr.setDelimiter(DELIM); + + sockStmr.setStreamer(stmr); + + // Converter from zero-terminated string to Java strings. + sockStmr.setConverter(new SocketMessageConverter<String>() { + @Override public String convert(byte[] msg) { + try { + return new String(msg, "ASCII"); + } + catch (UnsupportedEncodingException e) { + throw new IgniteException(e); + } + } + }); + + sockStmr.setTupleExtractor(new StreamTupleExtractor<String, Integer, Long>() { + @Override public Map.Entry<Integer, Long> extract(String input) { + String[] pair = input.split("="); + return new IgniteBiTuple<>(Integer.parseInt(pair[0]), Long.parseLong(pair[1])); + } + }); + + sockStmr.start(); + + sendData(addr, PORT); + } + } + } + + /** + * @param addr Address. + * @param port Port. + */ + private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException { + try (Socket sock = new Socket(addr, port); + OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) { + + while (true) { + int key = RAND.nextInt(RANGE); + + String str = key + "=1"; + + byte[] arr = str.getBytes("ASCII"); + + oos.write(arr); + oos.write(DELIM); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java new file mode 100644 index 0000000..ae7bdf9 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples. + */ +package org.apache.ignite.examples.streaming.socket; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java new file mode 100644 index 0000000..d49ddd4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.apache.ignite.*; + +import java.io.*; +import java.nio.*; + +/** + * This class implements stream parser based on {@link GridNioDelimitedBuffer}. + * <p> + * The rule for this parser is that every message sent over the stream is appended with + * delimiter (bytes array). So, the stream structure is as follows: + * <pre> + * +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+- + * | MESSAGE | DELIMITER | MESSAGE | DELIMITER | + * +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+- + * </pre> + */ +public class GridDelimitedParser implements GridNioParser { + /** Buffer metadata key. */ + private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** Delimiter. */ + private final byte[] delim; + + /** Direct buffer. */ + private final boolean directBuf; + + /** + * @param delim Delimiter. + * @param directBuf Direct buffer. + */ + public GridDelimitedParser(byte[] delim, boolean directBuf) { + this.delim = delim; + this.directBuf = directBuf; + } + + /** {@inheritDoc} */ + @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { + GridNioDelimitedBuffer nioBuf = ses.meta(BUF_META_KEY); + + // Decode for a given session is called per one thread, so there should not be any concurrency issues. + // However, we make some additional checks. + if (nioBuf == null) { + nioBuf = new GridNioDelimitedBuffer(delim); + + GridNioDelimitedBuffer old = ses.addMeta(BUF_META_KEY, nioBuf); + + assert old == null; + } + + return nioBuf.read(buf); + } + + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { + byte[] msg0 = (byte[])msg; + + int capacity = msg0.length + delim.length; + ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity); + + res.put(msg0); + res.put(delim); + + res.flip(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return this.getClass().getSimpleName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java new file mode 100644 index 0000000..5f895ee --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.jetbrains.annotations.*; + +import java.nio.*; +import java.util.*; + +/** + * Buffer with message delimiter support. + */ +public class GridNioDelimitedBuffer { + /** Buffer size. */ + private static final int BUFFER_SIZE = 512; + + /** Delimiter. */ + private final byte[] delim; + + /** Data. */ + private byte[] data; + + /** Count. */ + private int cnt; + + /** + * @param delim Delimiter. + */ + public GridNioDelimitedBuffer(byte[] delim) { + assert delim != null; + assert delim.length > 0; + + this.delim = delim; + + reset(); + } + + private void reset() { + cnt = 0; + + data = new byte[BUFFER_SIZE]; + } + + /** + * @param buf Buffer. + * @return Message bytes or {@code null} if message is not fully read yet. + */ + @Nullable public byte[] read(ByteBuffer buf) { + for(; buf.hasRemaining();) { + + if (cnt == data.length) + data = Arrays.copyOf(data, data.length * 2); + + data[cnt++] = buf.get(); + + if (cnt >= delim.length && found()) { + byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length); + + reset(); + + return bytes; + } + } + + return null; + } + + /** + * Tries find delimiter in buffer. + * + * @return {@code True} if delimiter found, {@code false} - otherwise. + */ + private boolean found() { + int from = cnt - delim.length; + + for (int i = 0; i < delim.length ; i++) { + if (delim[i] != data[from + i]) + return false; + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java new file mode 100644 index 0000000..43754ff --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.logger.java.*; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.stream.adapters.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.nio.*; +import java.util.*; + +/** + * Server that receives data from TCP socket, converts it to key-value pairs and streams into + * {@link IgniteDataStreamer} instance. + */ +public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> { + /** Default port. */ + private static final int DFLT_PORT = 5555; + + /** Default threads. */ + private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors(); + + /** Logger. */ + private final IgniteLogger log = new JavaLogger(false) { + @Override public boolean isDebugEnabled() { + return true; + } + }; + + /** Address. */ + private InetAddress addr; + + /** Server port. */ + private int port = DFLT_PORT; + + /** Threads number. */ + private int threads = DFLT_THREADS; + + /** Direct mode. */ + private boolean directMode; + + /** Delimiter. */ + private byte[] delim; + + /** Converter. */ + private SocketMessageConverter<T> converter; + + /** Server. */ + private GridNioServer<byte[]> srv; + + /** + * Sets server address. + * + * @param addr Address. + */ + public void setAddr(InetAddress addr) { + this.addr = addr; + } + + /** + * Sets port number. + * + * @param port Port. + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Sets threadds amount. + * + * @param threads Threads. + */ + public void setThreads(int threads) { + this.threads = threads; + } + + /** + * Sets direct mode flag. + * + * @param directMode Direct mode. + */ + public void setDirectMode(boolean directMode) { + this.directMode = directMode; + } + + /** + * Sets message delimiter. + * + * @param delim Delimiter. + */ + public void setDelimiter(byte[] delim) { + this.delim = delim; + } + + /** + * Sets message converter. + * + * @param converter Converter. + */ + public void setConverter(SocketMessageConverter<T> converter) { + this.converter = converter; + } + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() { + GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() { + @Override public void onConnected(GridNioSession ses) { + assert ses.accepted(); + + if (log.isDebugEnabled()) + log.debug("Accepted connection: " + ses.remoteAddress()); + } + + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + if (e != null) + log.error("Connection failed with exception", e); + } + + @Override public void onMessage(GridNioSession ses, byte[] msg) { + T obj = converter.convert(msg); + + Map.Entry<K, V> e = getTupleExtractor().extract(obj); + + getStreamer().addData(e); + } + }; + + ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + + GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) : + new GridDelimitedParser(delim, directMode); + + if (converter == null) + converter = new DefaultConverter<>(); + + GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode); + + GridNioFilter[] filters = new GridNioFilter[] {codec}; + + try { + srv = new GridNioServer.Builder<byte[]>() + .address(addr == null ? InetAddress.getLocalHost() : addr) + .port(port) + .listener(lsnr) + .logger(log) + .selectorCount(threads) + .byteOrder(byteOrder) + .filters(filters) + .build(); + } + catch (IgniteCheckedException | UnknownHostException e) { + throw new IgniteException(e); + } + + srv.start(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server started on " + addr + ':' + port); + } + + /** + * Stops streamer. + */ + public void stop() { + srv.stop(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server stopped"); + } + + /** + * Converts message to Java object using Jdk marshaller. + */ + private static class DefaultConverter<T> implements SocketMessageConverter<T> { + /** Marshaller. */ + private static final JdkMarshaller MARSH = new JdkMarshaller(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public T convert(byte[] msg) { + try { + return MARSH.unmarshal(msg, null); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java new file mode 100644 index 0000000..0a70a33 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +/** + * Socket message converter. + */ +public interface SocketMessageConverter<T> { + + /** + * Converter message represented by array of bytes to object. + * + * @param msg Message. + * @return Converted object. + */ + public T convert(byte[] msg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3954773b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java new file mode 100644 index 0000000..e1cef65 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains socket streamer implementation. + */ +package org.apache.ignite.stream.socket; \ No newline at end of file
