[
https://issues.apache.org/jira/browse/NIFI-274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14977613#comment-14977613
]
Tony Kurc edited comment on NIFI-274 at 10/28/15 2:47 AM:
----------------------------------------------------------
[~bende] - I'm a bit concerned about the networking threads (on listen). First
of all, how many ChannelReader threads did you want to have running at the same
time? It looks like one per processor (as it looks like the only thread that is
created is in onScheduled).
If only one thread is expected, looking at the DatagramChannel It seems like
only one buffer will be used at a time, since you have one thread, which
receives a datagram, parses it and synchronously drops it in a queue.
If that is the case would buffer ever be null here?
{code:java}
if (buffer == null) {
Thread.sleep(10L);
logger.debug("no available buffers, continuing...");
continue;
}
{code}
Secondly, you're using non-blocking sockets, but it looks like you're polling
with a fixed interval of 1s if no data is available. Had you considered using a
Selector (with select) to decide "when to call receive"?
{code:java}
final SocketAddress sender =
datagramChannel.receive(buffer);
if (sender == null) {
Thread.sleep(1000L); // nothing to do so wait...
}
{code}
I'm still working through the TCP code, but I was hoping you could correct me
if I'm reading it wrong
General gist:
1. accept a connection
2. spawn off a thread (no more than 2) to handle some buffer-fulls of syslog
messages until nothing else is read then close socket. Push the messages onto
the queue for SyslogEvents
Doing some wiresharking of rsyslogd, it seems reasonably happy with a very long
lived TCP connection, and actually seems unhappy when I close a connection.
Also, in your loop, it looks if your buffer doesn't end in a \n, you return
that buffer to the pool with data still on it.
{code:java}
for (int i = 0; i < bufferLength; i++) {
byte currByte = buffer.get(i);
currBytes.write(currByte);
// at the end of a message so parse an event, reset
the buffer, and break out of the loop
if (currByte == '\n') {
final SyslogEvent event =
syslogParser.parseEvent(currBytes.toByteArray());
logger.trace(event.getFullMessage());
syslogEvents.put(event); // block until space
is available
currBytes.reset();
}
}
} finally {
bufferPool.returnBuffer(buffer, 0);
}
{code}
was (Author: tkurc):
[~bende] - I'm a bit concerned about the networking threads (on listen). First
of all, how many ChannelReader threads did you want to have running at the same
time? It looks like one per processor (as it looks like the only thread that is
created is in onScheduled).
If only one thread is expected, looking at the DatagramChannel It seems like
only one buffer will be used at a time, since you have one thread, which
receives a datagram, parses it and synchronously drops it in a queue.
If that is the case would buffer ever be null here?
{code:java}
if (buffer == null) {
Thread.sleep(10L);
logger.debug("no available buffers, continuing...");
continue;
}
{code}
Secondly, you're using non-blocking sockets, but it looks like you're polling
with a fixed interval of 1s if no data is available. Had you considered using a
Selector (with select) to decide "when to call receive"?
{code:java}
final SocketAddress sender =
datagramChannel.receive(buffer);
if (sender == null) {
Thread.sleep(1000L); // nothing to do so wait...
}
{code}
I'm still working through the TCP code, but I was hoping you could correct me
if I'm reading it wrong
General gist:
1. accept a connection
2. spawn off a thread (no more than 2) to handle a some buffer-fulls of syslog
messages until nothing else is read then close socket. Pop the message onto the
queue for SyslogEvents
Doing some wiresharking of rsyslogd, it seems reasonably happy with a very long
lived TCP connection, and actually seems unhappy when I close a connection.
Also, in your loop, it looks if your buffer doesn't end in a \n, you return
that buffer to the pool with data still on it.
{code:java}
for (int i = 0; i < bufferLength; i++) {
byte currByte = buffer.get(i);
currBytes.write(currByte);
// at the end of a message so parse an event, reset
the buffer, and break out of the loop
if (currByte == '\n') {
final SyslogEvent event =
syslogParser.parseEvent(currBytes.toByteArray());
logger.trace(event.getFullMessage());
syslogEvents.put(event); // block until space
is available
currBytes.reset();
}
}
} finally {
bufferPool.returnBuffer(buffer, 0);
}
{code}
> Syslog processors
> -----------------
>
> Key: NIFI-274
> URL: https://issues.apache.org/jira/browse/NIFI-274
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 0.1.0
> Environment: any
> Reporter: Corey Flowers
> Assignee: Tony Kurc
> Priority: Minor
> Labels: features, syslog
> Fix For: 0.4.0
>
> Attachments: NIFI-274-2.patch, NIFI-274-3.patch, NIFI-274-4.patch,
> NIFI-274.patch, SyslogProcessorTesting.xml
>
>
> request to add syslog processors for direct interaction with syslog to
> include pulls and pushes of log info.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)