exceptionfactory commented on a change in pull request #5044:
URL: https://github.com/apache/nifi/pull/5044#discussion_r632540020
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
##########
@@ -222,213 +224,97 @@ protected void init(final ProcessorInitializationContext
context) {
}
@OnScheduled
- public void onScheduled(final ProcessContext context) throws IOException {
- // initialize the queue of senders, one per task, senders will get
created on the fly in onTrigger
- this.senderPool = new
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
- }
-
- protected ChannelSender createSender(final ProcessContext context) throws
IOException {
- final int port =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
- final String host =
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+ public void onScheduled(final ProcessContext context) throws
InterruptedException {
+ eventSender = getEventSender(context);
final String protocol = context.getProperty(PROTOCOL).getValue();
- final int maxSendBuffer =
context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).intValue();
- final int timeout =
context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
- final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- return createSender(sslContextService, protocol, host, port,
maxSendBuffer, timeout);
- }
-
- // visible for testing to override and provide a mock sender if desired
- protected ChannelSender createSender(final SSLContextService
sslContextService, final String protocol, final String host,
- final int port, final int
maxSendBufferSize, final int timeout)
- throws IOException {
-
- ChannelSender sender;
- if (protocol.equals(UDP_VALUE.getValue())) {
- sender = new DatagramChannelSender(host, port, maxSendBufferSize,
getLogger());
- } else {
- // if an SSLContextService is provided then we make a secure sender
- if (sslContextService != null) {
- final SSLContext sslContext =
sslContextService.createContext();
- sender = new SSLSocketChannelSender(host, port,
maxSendBufferSize, sslContext, getLogger());
- } else {
- sender = new SocketChannelSender(host, port,
maxSendBufferSize, getLogger());
- }
- }
- sender.setTimeout(timeout);
- sender.open();
- return sender;
+ final String hostname =
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+ final int port =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+ transitUri = String.format("%s://%s:%s", protocol, hostname, port);
}
@OnStopped
- public void onStopped() {
- if (senderPool != null) {
- ChannelSender sender = senderPool.poll();
- while (sender != null) {
- sender.close();
- sender = senderPool.poll();
- }
+ public void onStopped() throws Exception {
+ if (eventSender != null) {
+ eventSender.close();
}
}
- private PruneResult pruneIdleSenders(final long idleThreshold){
- int numClosed = 0;
- int numConsidered = 0;
-
- long currentTime = System.currentTimeMillis();
- final List<ChannelSender> putBack = new ArrayList<>();
-
- // if a connection hasn't been used with in the threshold then it gets
closed
- ChannelSender sender;
- while ((sender = senderPool.poll()) != null) {
- numConsidered++;
- if (currentTime > (sender.getLastUsed() + idleThreshold)) {
- getLogger().debug("Closing idle connection...");
- sender.close();
- numClosed++;
- } else {
- putBack.add(sender);
- }
- }
-
- // re-queue senders that weren't idle, but if the queue is full then
close the sender
- for (ChannelSender putBackSender : putBack) {
- boolean returned = senderPool.offer(putBackSender);
- if (!returned) {
- putBackSender.close();
- }
- }
-
- return new PruneResult(numClosed, numConsidered);
- }
-
@Override
- public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- final String protocol = context.getProperty(PROTOCOL).getValue();
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
final int batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
final List<FlowFile> flowFiles = session.get(batchSize);
Review comment:
That's a good point, I removed the yield when I removed the unnecessary
pruning of idle senders, but it still makes sense to yield in the absence of
Flow Files.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]