[
https://issues.apache.org/jira/browse/KARAF-5454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16255439#comment-16255439
]
ASF GitHub Bot commented on KARAF-5454:
---------------------------------------
jbonofre closed pull request #18: [KARAF-5454] - Collector socket - Add UDP
protocol support
URL: https://github.com/apache/karaf-decanter/pull/18
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
b/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
index b9a04b2..ed67ea7 100644
---
a/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
+++
b/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
@@ -4,4 +4,7 @@
#port=34343
# Number of worker threads to deal with
-#workers=10
\ No newline at end of file
+#workers=10
+
+# Protocol tcp(default) or udp
+#protocol=tcp
diff --git
a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
index 49ddb63..3c1e48e 100644
---
a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
+++
b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
@@ -17,6 +17,8 @@
package org.apache.karaf.decanter.collector.socket;
import java.io.*;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -49,6 +51,8 @@
private static final Logger LOGGER =
LoggerFactory.getLogger(SocketCollector.class);
private ServerSocket serverSocket;
+ private DatagramSocket datagramSocket;
+ private Protocol protocol;
private EventAdmin eventAdmin;
private boolean open;
private ExecutorService executor;
@@ -56,6 +60,11 @@
private String eventAdminTopic;
private EventAdmin dispatcher;
private Unmarshaller unmarshaller;
+
+ private enum Protocol {
+ TCP,
+ UDP;
+ }
@SuppressWarnings("unchecked")
@Activate
@@ -63,8 +72,24 @@ public void activate(ComponentContext context) throws
IOException {
this.properties = context.getProperties();
int port = Integer.parseInt(getProperty(this.properties, "port",
"34343"));
int workers = Integer.parseInt(getProperty(this.properties, "workers",
"10"));
+
+ this.protocol = Protocol.valueOf(getProperty(this.properties,
"protocol", "tcp").toUpperCase());
+ // force TCP protocol if value not in Enum
+ if (this.protocol == null) {
+ this.protocol = Protocol.TCP;
+ }
+
eventAdminTopic = getProperty(this.properties,
EventConstants.EVENT_TOPIC, "decanter/collect/socket");
- this.serverSocket = new ServerSocket(port);
+
+ switch (protocol) {
+ case TCP:
+ this.serverSocket = new ServerSocket(port);
+ break;
+ case UDP:
+ this.datagramSocket = new DatagramSocket(port);
+ break;
+ }
+
// adding 1 for serverSocket handling
this.executor = Executors.newFixedThreadPool(workers + 1);
this.executor.execute(this);
@@ -79,9 +104,21 @@ private String getProperty(Dictionary<String, Object>
properties, String key, St
public void run() {
while (open) {
try {
- Socket socket = serverSocket.accept();
- LOGGER.debug("Connected to client at {}",
socket.getInetAddress());
- this.executor.execute(new SocketRunnable(socket));
+ switch (protocol) {
+ case TCP:
+ Socket socket = serverSocket.accept();
+ LOGGER.debug("Connected to TCP client at {}",
socket.getInetAddress());
+ this.executor.execute(new SocketRunnable(socket));
+ break;
+
+ case UDP:
+ byte[] buffer = new byte[1024];
+ DatagramPacket packet = new DatagramPacket(buffer,
buffer.length);
+ LOGGER.debug("Connected to UDP client at {}",
datagramSocket.getLocalSocketAddress());
+ datagramSocket.receive(packet);
+ this.executor.execute(new DatagramRunnable(packet));
+ break;
+ }
} catch (IOException e) {
LOGGER.warn("Exception receiving log.", e);
}
@@ -103,7 +140,15 @@ public void close() throws IOException {
} catch (Exception e) {
// nothing to do
}
- serverSocket.close();
+ switch (protocol) {
+ case TCP:
+ serverSocket.close();
+ break;
+
+ case UDP:
+ datagramSocket.close();
+ break;
+ }
}
@Reference
@@ -148,6 +193,41 @@ public void run() {
}
}
}
+
+ private class DatagramRunnable implements Runnable {
+
+ private DatagramPacket packet;
+
+ public DatagramRunnable(DatagramPacket packet) {
+ this.packet = packet;
+ }
+
+ public void run() {
+
+ try (ByteArrayInputStream bais = new
ByteArrayInputStream(packet.getData())) {
+ Map<String, Object> data = new HashMap<>();
+ data.put("hostAddress",
InetAddress.getLocalHost().getHostAddress());
+ data.put("hostName", InetAddress.getLocalHost().getHostName());
+ data.put("type", "socket");
+ String karafName = System.getProperty("karaf.name");
+ if (karafName != null) {
+ data.put("karafName", karafName);
+ }
+ try {
+ data.putAll(unmarshaller.unmarshal(bais));
+ } catch (Exception e) {
+ // nothing to do
+ }
+ Event event = new Event(eventAdminTopic, data);
+ dispatcher.postEvent(event);
+ datagramSocket.send(packet);
+ } catch (EOFException e) {
+ LOGGER.warn("Client closed the connection", e);
+ } catch (IOException e) {
+ LOGGER.warn("Exception receiving data", e);
+ }
+ }
+ }
@Reference
public void setDispatcher(EventAdmin dispatcher) {
diff --git a/manual/src/main/asciidoc/user-guide/collectors.adoc
b/manual/src/main/asciidoc/user-guide/collectors.adoc
index c236409..8a8980d 100644
--- a/manual/src/main/asciidoc/user-guide/collectors.adoc
+++ b/manual/src/main/asciidoc/user-guide/collectors.adoc
@@ -469,10 +469,14 @@ This feature installs a default
`etc/org.apache.karaf.decanter.collector.socket.
# Number of worker threads to deal with
#workers=10
+
+# Protocol tcp(default) or udp
+#protocol=tcp
----
* the `port` property contains the port number where the network socket
collector is listening
* the `workers` property contains the number of worker thread the socket
collector is using for connection
+* the `protocol` property contains the protocol used by the collector for
transferring data with the client
==== JMS
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Collector socket - Add UDP protocol support
> -------------------------------------------
>
> Key: KARAF-5454
> URL: https://issues.apache.org/jira/browse/KARAF-5454
> Project: Karaf
> Issue Type: Improvement
> Components: decanter
> Reporter: Francois Papon
> Assignee: Jean-Baptiste Onofré
> Priority: Minor
> Fix For: decanter-1.5.0
>
>
> Syslog send data with the UDP protocol and actually, it doesn't work
> with the collector-socket version.
> We could use a DatagramServer instead of a ServerSocket for UDP.
> Modifications :
> - Add a new properties in the collector cfg file to define the protocol :
> # Protocol tcp(default) or udp
> #protocol=tcp
> - Instanciate a DatagramServer in the case of udp protocol is define in
> the cfg (org.apache.karaf.decanter.collector.socket.SocketCollector)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)