exceptionfactory commented on a change in pull request #5044:
URL: https://github.com/apache/nifi/pull/5044#discussion_r632540020



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutSyslog.java
##########
@@ -222,213 +224,97 @@ protected void init(final ProcessorInitializationContext 
context) {
     }
 
     @OnScheduled
-    public void onScheduled(final ProcessContext context) throws IOException {
-        // initialize the queue of senders, one per task, senders will get 
created on the fly in onTrigger
-        this.senderPool = new 
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
-    }
-
-    protected ChannelSender createSender(final ProcessContext context) throws 
IOException {
-        final int port = 
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
-        final String host = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+    public void onScheduled(final ProcessContext context) throws 
InterruptedException {
+        eventSender = getEventSender(context);
         final String protocol = context.getProperty(PROTOCOL).getValue();
-        final int maxSendBuffer = 
context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).evaluateAttributeExpressions().asDataSize(DataUnit.B).intValue();
-        final int timeout = 
context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        return createSender(sslContextService, protocol, host, port, 
maxSendBuffer, timeout);
-    }
-
-    // visible for testing to override and provide a mock sender if desired
-    protected ChannelSender createSender(final SSLContextService 
sslContextService, final String protocol, final String host,
-                                         final int port, final int 
maxSendBufferSize, final int timeout)
-            throws IOException {
-
-        ChannelSender sender;
-        if (protocol.equals(UDP_VALUE.getValue())) {
-            sender = new DatagramChannelSender(host, port, maxSendBufferSize, 
getLogger());
-        } else {
-            // if an SSLContextService is provided then we make a secure sender
-            if (sslContextService != null) {
-                final SSLContext sslContext = 
sslContextService.createContext();
-                sender = new SSLSocketChannelSender(host, port, 
maxSendBufferSize, sslContext, getLogger());
-            } else {
-                sender = new SocketChannelSender(host, port, 
maxSendBufferSize, getLogger());
-            }
-        }
-        sender.setTimeout(timeout);
-        sender.open();
-        return sender;
+        final String hostname = 
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+        final int port = 
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+        transitUri = String.format("%s://%s:%s", protocol, hostname, port);
     }
 
     @OnStopped
-    public void onStopped() {
-        if (senderPool != null) {
-            ChannelSender sender = senderPool.poll();
-            while (sender != null) {
-                sender.close();
-                sender = senderPool.poll();
-            }
+    public void onStopped() throws Exception {
+        if (eventSender != null) {
+            eventSender.close();
         }
     }
 
-    private PruneResult pruneIdleSenders(final long idleThreshold){
-        int numClosed = 0;
-        int numConsidered = 0;
-
-        long currentTime = System.currentTimeMillis();
-        final List<ChannelSender> putBack = new ArrayList<>();
-
-        // if a connection hasn't been used with in the threshold then it gets 
closed
-        ChannelSender sender;
-        while ((sender = senderPool.poll()) != null) {
-            numConsidered++;
-            if (currentTime > (sender.getLastUsed() + idleThreshold)) {
-                getLogger().debug("Closing idle connection...");
-                sender.close();
-                numClosed++;
-            } else {
-                putBack.add(sender);
-            }
-        }
-
-        // re-queue senders that weren't idle, but if the queue is full then 
close the sender
-        for (ChannelSender putBackSender : putBack) {
-            boolean returned = senderPool.offer(putBackSender);
-            if (!returned) {
-                putBackSender.close();
-            }
-        }
-
-        return new PruneResult(numClosed, numConsidered);
-    }
-
     @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        final String protocol = context.getProperty(PROTOCOL).getValue();
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         final int batchSize = 
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
-
         final List<FlowFile> flowFiles = session.get(batchSize);

Review comment:
       That's a good point, I removed the yield when I removed the unnecessary 
pruning of idle senders, but it still makes sense to yield in the absence of 
Flow Files.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to