exceptionfactory commented on a change in pull request #5398:
URL: https://github.com/apache/nifi/pull/5398#discussion_r734822532



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -96,27 +100,84 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
-    private volatile RELPEncoder relpEncoder;
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this 
relationship.")
+            .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, 
CLIENT_AUTH);
-    }
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<RELPMessage> events;
+    protected volatile BlockingQueue<RELPMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile SSLContext sslContext;
+    protected volatile ClientAuth clientAuth;
+    protected volatile int maxConnections;
+    protected volatile int bufferSize;
+    protected volatile EventBatcher eventBatcher;
 
-    @Override
     @OnScheduled
     public void onScheduled(ProcessContext context) throws IOException {
-        super.onScheduled(context);
-        // wanted to ensure charset was already populated here
-        relpEncoder = new RELPEncoder(charset);
+        maxConnections = 
context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        bufferSize = 
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = 
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        hostname = NetworkUtils.getInterfaceAddress(networkInterface);
+        charset = 
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = 
context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new 
LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        eventBatcher = getEventBatcher();
+
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+
+        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = 
context.getProperty(CLIENT_AUTH).getValue();
+            sslContext = sslContextService.createContext();
+            clientAuth = ClientAuth.valueOf(clientAuthValue);
+        }
+
+        initializeRelpServer();
+    }
+
+    @OnStopped
+    public void stopped() {
+        if (eventServer != null) {
+            eventServer.shutdown();
+        }
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);

Review comment:
       The `Local Network Interface` property was previously the first 
property, was it moved to keep optional properties together, or should it be 
moved back to the first position?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
##########
@@ -96,27 +100,84 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
-    private volatile RELPEncoder relpEncoder;
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this 
relationship.")
+            .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, 
CLIENT_AUTH);
-    }
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile Charset charset;
+    protected volatile BlockingQueue<RELPMessage> events;
+    protected volatile BlockingQueue<RELPMessage> errorEvents;
+    protected volatile InetAddress hostname;
+    protected EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile SSLContext sslContext;
+    protected volatile ClientAuth clientAuth;
+    protected volatile int maxConnections;
+    protected volatile int bufferSize;
+    protected volatile EventBatcher eventBatcher;
 
-    @Override
     @OnScheduled
     public void onScheduled(ProcessContext context) throws IOException {
-        super.onScheduled(context);
-        // wanted to ensure charset was already populated here
-        relpEncoder = new RELPEncoder(charset);
+        maxConnections = 
context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        bufferSize = 
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = 
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        hostname = NetworkUtils.getInterfaceAddress(networkInterface);
+        charset = 
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = 
context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new 
LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        eventBatcher = getEventBatcher();
+
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+
+        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final String clientAuthValue = 
context.getProperty(CLIENT_AUTH).getValue();
+            sslContext = sslContextService.createContext();
+            clientAuth = ClientAuth.valueOf(clientAuthValue);
+        }
+
+        initializeRelpServer();
+    }
+
+    @OnStopped
+    public void stopped() {
+        if (eventServer != null) {
+            eventServer.shutdown();
+        }
+    }
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(CLIENT_AUTH);
+        descriptors.add(SSL_CONTEXT_SERVICE);

Review comment:
       The order of these two properties was reversed from the previous 
implementation, recommend adjusting the order, and adding `dependsOn()` to the 
Client Auth property so that it is only visible when the SSL Context Service is 
configured.
   ```suggestion
           descriptors.add(SSL_CONTEXT_SERVICE);
           descriptors.add(CLIENT_AUTH);
   ```

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
##########
@@ -178,16 +175,10 @@ public void onScheduled(final ProcessContext context) 
throws IOException {
         charset = Charset.forName(context.getProperty(CHARSET).getValue());
         port = 
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
         events = new 
LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
-
         final String nicIPAddressStr = 
context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
-        final int maxChannelBufferSize = 
context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-
-        InetAddress nicIPAddress = null;
-        if (!StringUtils.isEmpty(nicIPAddressStr)) {
-            NetworkInterface netIF = 
NetworkInterface.getByName(nicIPAddressStr);
-            nicIPAddress = netIF.getInetAddresses().nextElement();
-        }
+        final InetAddress nicIPAddress = 
NetworkUtils.getInterfaceAddress(nicIPAddressStr);

Review comment:
       Recommend renaming variables:
   ```suggestion
           final InetAddress interfaceAddress = 
NetworkUtils.getInterfaceAddress(interfaceName);
   ```

##########
File path: 
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
##########
@@ -89,4 +93,21 @@ public static boolean isListening(final String hostname, 
final int port, final i
 
         return (result != null && result);
     }
+
+    public static InetAddress getInterfaceAddress(final String 
interfaceIPAddress) throws SocketException {
+        InetAddress nicIPAddress = null;
+        if (interfaceIPAddress != null && !interfaceIPAddress.isEmpty()) {
+            NetworkInterface netIF = 
NetworkInterface.getByName(interfaceIPAddress);
+            nicIPAddress = netIF.getInetAddresses().nextElement();
+        }
+        return nicIPAddress;
+    }

Review comment:
       Recommend adding some documentation to this method indicating that if 
the `interfaceName` is not matched, this method will return `null`. Also 
recommend updating the variable names for improved clarity:
   ```suggestion
       /**
        * Get Interface Address using interface name
        *
        * @param interfaceName Network Interface Name
        * @return Interface Address or null when matching network interface 
name not found
        * @throws SocketException Thrown when failing to get interface addresses
        */
       public static InetAddress getInterfaceAddress(final String 
interfaceName) throws SocketException {
           InetAddress interfaceAddress = null;
           if (interfaceName != null && !interfaceName.isEmpty()) {
               final NetworkInterface interfaceName = 
NetworkInterface.getByName(interfaceName);
               interfaceAddress = 
interfaceName.getInetAddresses().nextElement();
           }
           return interfaceAddress;
       }
   ```

##########
File path: 
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
##########
@@ -89,4 +93,21 @@ public static boolean isListening(final String hostname, 
final int port, final i
 
         return (result != null && result);
     }
+
+    public static InetAddress getInterfaceAddress(final String 
interfaceIPAddress) throws SocketException {
+        InetAddress nicIPAddress = null;
+        if (interfaceIPAddress != null && !interfaceIPAddress.isEmpty()) {
+            NetworkInterface netIF = 
NetworkInterface.getByName(interfaceIPAddress);
+            nicIPAddress = netIF.getInetAddresses().nextElement();
+        }
+        return nicIPAddress;
+    }
+
+    public static InetAddress getDefaultInterfaceAddress() {

Review comment:
       Is this method used, or can it be removed?

##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public abstract class EventBatcher<E extends ByteArrayMessage> {
+
+    public static final int POLL_TIMEOUT_MS = 20;
+
+    private volatile BlockingQueue<E> events;
+    private volatile BlockingQueue<E> errorEvents;
+    private final ComponentLog logger;
+
+    public EventBatcher(final ComponentLog logger, final BlockingQueue events, 
final BlockingQueue errorEvents) {
+        this.logger = logger;
+        this.events = events;
+        this.errorEvents = errorEvents;
+    }
+
+    /**
+     * Batches together up to the batchSize events. Events are grouped 
together based on a batch key which
+     * by default is the sender of the event, but can be overriden by 
sub-classes.
+     * <p>
+     * This method will return when batchSize has been reached, or when no 
more events are available on the queue.
+     *
+     * @param session                the current session
+     * @param totalBatchSize         the total number of events to process
+     * @param messageDemarcatorBytes the demarcator to put between messages 
when writing to a FlowFile
+     * @return a Map from the batch key to the FlowFile and events for that 
batch, the size of events in all
+     * the batches will be <= batchSize
+     */
+    public Map<String, FlowFileEventBatch> getBatches(final ProcessSession 
session, final int totalBatchSize,
+                                                      final byte[] 
messageDemarcatorBytes) {
+
+        final Map<String, FlowFileEventBatch> batches = new HashMap<String, 
FlowFileEventBatch>();
+        for (int i = 0; i < totalBatchSize; i++) {
+            final E event = getMessage(true, true, session);
+            if (event == null) {
+                break;
+            }
+
+            final String batchKey = getBatchKey(event);
+            FlowFileEventBatch batch = batches.get(batchKey);
+
+            // if we don't have a batch for this key then create a new one
+            if (batch == null) {
+                batch = new FlowFileEventBatch(session.create(), new 
ArrayList<E>());
+                batches.put(batchKey, batch);
+            }
+
+            // add the current event to the batch
+            batch.getEvents().add(event);
+
+            // append the event's data to the FlowFile, write the demarcator 
first if not on the first event
+            final boolean writeDemarcator = (i > 0);
+            try {
+                final byte[] rawMessage = event.getMessage();
+                FlowFile appendedFlowFile = 
session.append(batch.getFlowFile(), new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws 
IOException {
+                        if (writeDemarcator) {
+                            out.write(messageDemarcatorBytes);
+                        }
+
+                        out.write(rawMessage);
+                    }
+                });
+
+                // update the FlowFile reference in the batch object
+                batch.setFlowFile(appendedFlowFile);
+
+            } catch (final Exception e) {
+                logger.error("Failed to write contents of the message to 
FlowFile due to {}; will re-queue message and try again",
+                        e.getMessage(), e);
+                errorEvents.offer(event);
+                break;
+            }
+        }
+
+        return batches;
+    }
+
+    protected abstract String getBatchKey(E event);

Review comment:
       Recommend adding a method comment here to describe the expected 
implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to