This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git
The following commit(s) were added to refs/heads/master by this push:
new dc39ee7 [KARAF-5454] - Collector socket - Add UDP protocol support
(#18)
dc39ee7 is described below
commit dc39ee7796135348edb80331e1ca416921c04b05
Author: fpapon <[email protected]>
AuthorDate: Thu Nov 16 18:48:45 2017 +0400
[KARAF-5454] - Collector socket - Add UDP protocol support (#18)
* [KARAF-5454] - Collector socket - Add UDP protocol support
---
.../org.apache.karaf.decanter.collector.socket.cfg | 5 +-
.../decanter/collector/socket/SocketCollector.java | 90 ++++++++++++++++++++--
.../src/main/asciidoc/user-guide/collectors.adoc | 4 +
3 files changed, 93 insertions(+), 6 deletions(-)
diff --git
a/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
b/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
index b9a04b2..ed67ea7 100644
---
a/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
+++
b/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
@@ -4,4 +4,7 @@
#port=34343
# Number of worker threads to deal with
-#workers=10
\ No newline at end of file
+#workers=10
+
+# Protocol tcp(default) or udp
+#protocol=tcp
diff --git
a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
index 49ddb63..3c1e48e 100644
---
a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
+++
b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
@@ -17,6 +17,8 @@
package org.apache.karaf.decanter.collector.socket;
import java.io.*;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -49,6 +51,8 @@ public class SocketCollector implements Closeable, Runnable {
private static final Logger LOGGER =
LoggerFactory.getLogger(SocketCollector.class);
private ServerSocket serverSocket;
+ private DatagramSocket datagramSocket;
+ private Protocol protocol;
private EventAdmin eventAdmin;
private boolean open;
private ExecutorService executor;
@@ -56,6 +60,11 @@ public class SocketCollector implements Closeable, Runnable {
private String eventAdminTopic;
private EventAdmin dispatcher;
private Unmarshaller unmarshaller;
+
+ private enum Protocol {
+ TCP,
+ UDP;
+ }
@SuppressWarnings("unchecked")
@Activate
@@ -63,8 +72,24 @@ public class SocketCollector implements Closeable, Runnable {
this.properties = context.getProperties();
int port = Integer.parseInt(getProperty(this.properties, "port",
"34343"));
int workers = Integer.parseInt(getProperty(this.properties, "workers",
"10"));
+
+ this.protocol = Protocol.valueOf(getProperty(this.properties,
"protocol", "tcp").toUpperCase());
+ // force TCP protocol if value not in Enum
+ if (this.protocol == null) {
+ this.protocol = Protocol.TCP;
+ }
+
eventAdminTopic = getProperty(this.properties,
EventConstants.EVENT_TOPIC, "decanter/collect/socket");
- this.serverSocket = new ServerSocket(port);
+
+ switch (protocol) {
+ case TCP:
+ this.serverSocket = new ServerSocket(port);
+ break;
+ case UDP:
+ this.datagramSocket = new DatagramSocket(port);
+ break;
+ }
+
// adding 1 for serverSocket handling
this.executor = Executors.newFixedThreadPool(workers + 1);
this.executor.execute(this);
@@ -79,9 +104,21 @@ public class SocketCollector implements Closeable, Runnable
{
public void run() {
while (open) {
try {
- Socket socket = serverSocket.accept();
- LOGGER.debug("Connected to client at {}",
socket.getInetAddress());
- this.executor.execute(new SocketRunnable(socket));
+ switch (protocol) {
+ case TCP:
+ Socket socket = serverSocket.accept();
+ LOGGER.debug("Connected to TCP client at {}",
socket.getInetAddress());
+ this.executor.execute(new SocketRunnable(socket));
+ break;
+
+ case UDP:
+ byte[] buffer = new byte[1024];
+ DatagramPacket packet = new DatagramPacket(buffer,
buffer.length);
+ LOGGER.debug("Connected to UDP client at {}",
datagramSocket.getLocalSocketAddress());
+ datagramSocket.receive(packet);
+ this.executor.execute(new DatagramRunnable(packet));
+ break;
+ }
} catch (IOException e) {
LOGGER.warn("Exception receiving log.", e);
}
@@ -103,7 +140,15 @@ public class SocketCollector implements Closeable,
Runnable {
} catch (Exception e) {
// nothing to do
}
- serverSocket.close();
+ switch (protocol) {
+ case TCP:
+ serverSocket.close();
+ break;
+
+ case UDP:
+ datagramSocket.close();
+ break;
+ }
}
@Reference
@@ -148,6 +193,41 @@ public class SocketCollector implements Closeable,
Runnable {
}
}
}
+
+ private class DatagramRunnable implements Runnable {
+
+ private DatagramPacket packet;
+
+ public DatagramRunnable(DatagramPacket packet) {
+ this.packet = packet;
+ }
+
+ public void run() {
+
+ try (ByteArrayInputStream bais = new
ByteArrayInputStream(packet.getData())) {
+ Map<String, Object> data = new HashMap<>();
+ data.put("hostAddress",
InetAddress.getLocalHost().getHostAddress());
+ data.put("hostName", InetAddress.getLocalHost().getHostName());
+ data.put("type", "socket");
+ String karafName = System.getProperty("karaf.name");
+ if (karafName != null) {
+ data.put("karafName", karafName);
+ }
+ try {
+ data.putAll(unmarshaller.unmarshal(bais));
+ } catch (Exception e) {
+ // nothing to do
+ }
+ Event event = new Event(eventAdminTopic, data);
+ dispatcher.postEvent(event);
+ datagramSocket.send(packet);
+ } catch (EOFException e) {
+ LOGGER.warn("Client closed the connection", e);
+ } catch (IOException e) {
+ LOGGER.warn("Exception receiving data", e);
+ }
+ }
+ }
@Reference
public void setDispatcher(EventAdmin dispatcher) {
diff --git a/manual/src/main/asciidoc/user-guide/collectors.adoc
b/manual/src/main/asciidoc/user-guide/collectors.adoc
index c236409..8a8980d 100644
--- a/manual/src/main/asciidoc/user-guide/collectors.adoc
+++ b/manual/src/main/asciidoc/user-guide/collectors.adoc
@@ -469,10 +469,14 @@ This feature installs a default
`etc/org.apache.karaf.decanter.collector.socket.
# Number of worker threads to deal with
#workers=10
+
+# Protocol tcp(default) or udp
+#protocol=tcp
----
* the `port` property contains the port number where the network socket
collector is listening
* the `workers` property contains the number of worker thread the socket
collector is using for connection
+* the `protocol` property contains the protocol used by the collector for
transferring data with the client
==== JMS
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].