exceptionfactory commented on a change in pull request #5669:
URL: https://github.com/apache/nifi/pull/5669#discussion_r788035213



##########
File path: 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, 
FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;

Review comment:
       Recommend declaring these as separate variables following standard 
convetions.

##########
File path: 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, 
FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
+
+    public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) 
{
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new BeatsEncoder();
+        this.messageFactory = new BeatsMessageFactory();
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, 
final List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String senderSocket = ctx.channel().remoteAddress().toString();
+        this.decoder = new BeatsDecoder(charset, logger);
+
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+
+            // decode the bytes and once we find the end of a frame, handle 
the frame
+            if (decoder.process(currByte)) {
+
+                final List<BeatsFrame> frames = decoder.getFrames();
+
+                for (BeatsFrame frame : frames) {
+                    logger.debug("Received Beats frame with transaction {} and 
frame type {}",
+                            frame.getSeqNumber(), frame.getFrameType());

Review comment:
       Recommend including the sender in the debug log:
   
   ```suggestion
                       logger.debug("Received Beats Frame Sender [{}] 
Transaction [{}] Frame Type [{}]",
                               senderSocket, frame.getSeqNumber(), 
frame.getFrameType());
   ```

##########
File path: 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, 
FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
+
+    public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) 
{
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new BeatsEncoder();
+        this.messageFactory = new BeatsMessageFactory();
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, 
final List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String senderSocket = ctx.channel().remoteAddress().toString();
+        this.decoder = new BeatsDecoder(charset, logger);
+
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+
+            // decode the bytes and once we find the end of a frame, handle 
the frame
+            if (decoder.process(currByte)) {
+
+                final List<BeatsFrame> frames = decoder.getFrames();
+
+                for (BeatsFrame frame : frames) {
+                    logger.debug("Received Beats frame with transaction {} and 
frame type {}",
+                            frame.getSeqNumber(), frame.getFrameType());
+                    // Ignore the WINDOW SIZE type frames as they contain no 
payload.
+                    if (frame.getFrameType() != 0x57) {

Review comment:
       This check should use the static variable:
   ```suggestion
                       if (FRAME_WINDOWSIZE != frame.getFrameType()) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
+import org.apache.nifi.processors.beats.response.BeatsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a BeatsMessage
+ */
[email protected]
+public class BeatsMessageChannelHandler extends 
SimpleChannelInboundHandler<BeatsMessage> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BeatsMessageChannelHandler.class);
+    private final BlockingQueue<BeatsMessage> events;
+    private final BeatsEncoder encoder;
+
+    public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events) {
+        this.events = events;
+        this.encoder = new BeatsEncoder();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) {
+        LOGGER.debug("Beats Message Received Length [{}] Remote Address [{}] 
", msg.getMessage().length, msg.getSender());
+        if (events.offer(msg)) {
+            LOGGER.debug("Event Queued: Beats Message Sender [{}] Sequence 
Number [{}]", msg.getSender(), msg.getSeqNumber());
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(new 
BeatsChannelResponse(encoder, 
BeatsResponse.ok(msg.getSeqNumber())).toByteArray()));
+        } else {
+            LOGGER.debug("Beats Queue Full: Failed Beats Message Sender [{}] 
Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
+            // TODO: Not sure if there's a way to respond with an error in 
Beats protocol..

Review comment:
       Recommend removing this comment if it cannot be implemented.

##########
File path: 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, 
FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
+
+    public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) 
{
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new BeatsEncoder();
+        this.messageFactory = new BeatsMessageFactory();
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, 
final List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String senderSocket = ctx.channel().remoteAddress().toString();
+        this.decoder = new BeatsDecoder(charset, logger);
+
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+
+            // decode the bytes and once we find the end of a frame, handle 
the frame
+            if (decoder.process(currByte)) {
+
+                final List<BeatsFrame> frames = decoder.getFrames();
+
+                for (BeatsFrame frame : frames) {
+                    logger.debug("Received Beats frame with transaction {} and 
frame type {}",
+                            frame.getSeqNumber(), frame.getFrameType());
+                    // Ignore the WINDOW SIZE type frames as they contain no 
payload.
+                    if (frame.getFrameType() != 0x57) {
+                        handle(frame, senderSocket, out);
+                    }
+                }
+            }
+        }
+        logger.debug("Done processing buffer");
+    }
+
+    private void handle(final BeatsFrame frame, final String sender, final 
List<Object> out) {
+        final Map<String, String> metadata = 
EventFactoryUtil.createMapWithSender(sender);
+        metadata.put(BeatsMetadata.SEQNUMBER_KEY, 
String.valueOf(frame.getSeqNumber()));
+
+        /* If frameType is a JSON , parse the frame payload into a JsonElement 
so that all JSON elements but "message"
+        are inserted into the event metadata.
+
+        As per above, the "message" element gets added into the body of the 
event
+        */

Review comment:
       This comment does not seems to reflect the behavior.  It looks like the 
Beats Message will only be emitted when the Frame Type is JSON.  Is that 
correct?

##########
File path: 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;
+    private final ComponentLog logger;
+    private final BeatsEncoder encoder;
+    private final BeatsMessageFactory messageFactory;
+
+    public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, 
FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
+
+    public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) 
{
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new BeatsEncoder();
+        this.messageFactory = new BeatsMessageFactory();
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, 
final List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String senderSocket = ctx.channel().remoteAddress().toString();
+        this.decoder = new BeatsDecoder(charset, logger);
+
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+
+            // decode the bytes and once we find the end of a frame, handle 
the frame
+            if (decoder.process(currByte)) {
+
+                final List<BeatsFrame> frames = decoder.getFrames();
+
+                for (BeatsFrame frame : frames) {
+                    logger.debug("Received Beats frame with transaction {} and 
frame type {}",
+                            frame.getSeqNumber(), frame.getFrameType());
+                    // Ignore the WINDOW SIZE type frames as they contain no 
payload.
+                    if (frame.getFrameType() != 0x57) {
+                        handle(frame, senderSocket, out);
+                    }
+                }
+            }
+        }
+        logger.debug("Done processing buffer");

Review comment:
       This log does not add much detail, it seems like it should be removed, 
or perhaps enhanced to include additional details such as the sender and total 
bytes.

##########
File path: 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
+import org.apache.nifi.processors.beats.response.BeatsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a BeatsMessage
+ */
[email protected]
+public class BeatsMessageChannelHandler extends 
SimpleChannelInboundHandler<BeatsMessage> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BeatsMessageChannelHandler.class);
+    private final BlockingQueue<BeatsMessage> events;
+    private final BeatsEncoder encoder;
+
+    public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events) {
+        this.events = events;
+        this.encoder = new BeatsEncoder();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) {
+        LOGGER.debug("Beats Message Received Length [{}] Remote Address [{}] 
", msg.getMessage().length, msg.getSender());
+        if (events.offer(msg)) {
+            LOGGER.debug("Event Queued: Beats Message Sender [{}] Sequence 
Number [{}]", msg.getSender(), msg.getSeqNumber());
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(new 
BeatsChannelResponse(encoder, 
BeatsResponse.ok(msg.getSeqNumber())).toByteArray()));
+        } else {
+            LOGGER.debug("Beats Queue Full: Failed Beats Message Sender [{}] 
Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());

Review comment:
       This should be a warning instead of a debug:
   ```suggestion
               LOGGER.warn("Beats Queue Full: Failed Beats Message Sender [{}] 
Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
   ```

##########
File path: 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
+import org.apache.nifi.processors.beats.response.BeatsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a BeatsMessage
+ */
[email protected]
+public class BeatsMessageChannelHandler extends 
SimpleChannelInboundHandler<BeatsMessage> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BeatsMessageChannelHandler.class);

Review comment:
       Recommend passing the ComponentLog so that logs can be associated with 
the Processor.

##########
File path: 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private BeatsDecoder decoder;

Review comment:
       Can this be refactored to a method-local variable?  It seems like it 
could introduce thread-safety issues as currently written.

##########
File path: 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
+import org.apache.nifi.processors.beats.response.BeatsResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a BeatsMessage
+ */
[email protected]
+public class BeatsMessageChannelHandler extends 
SimpleChannelInboundHandler<BeatsMessage> {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BeatsMessageChannelHandler.class);
+    private final BlockingQueue<BeatsMessage> events;
+    private final BeatsEncoder encoder;
+
+    public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events) {
+        this.events = events;
+        this.encoder = new BeatsEncoder();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) {
+        LOGGER.debug("Beats Message Received Length [{}] Remote Address [{}] 
", msg.getMessage().length, msg.getSender());
+        if (events.offer(msg)) {
+            LOGGER.debug("Event Queued: Beats Message Sender [{}] Sequence 
Number [{}]", msg.getSender(), msg.getSeqNumber());
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(new 
BeatsChannelResponse(encoder, 
BeatsResponse.ok(msg.getSeqNumber())).toByteArray()));

Review comment:
       This line is a bit hard to read given all of the nested.  Recommend 
breaking it into several lines for clarity.

##########
File path: 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
##########
@@ -14,24 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.event;
+package org.apache.nifi.processors.beats.netty;
 
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.event.NetworkEventFactory;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
 
 import java.util.Map;
 
 /**
- * An EventFactory implementation to create BeatEvents.
+ * An EventFactory implementation to create BeatsMessages.
  */
-public class BeatsEventFactory implements EventFactory<BeatsEvent> {
+public class BeatsMessageFactory implements NetworkEventFactory<BeatsMessage> {
 
     @Override
-    public BeatsEvent create(final byte[] data, final Map<String, String> 
metadata, final ChannelResponder responder) {
-        final String sender = metadata.get(EventFactory.SENDER_KEY);
-        final int seqNumber = 
Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
-
-        return new BeatsEvent(sender, data, responder, seqNumber);
+    public BeatsMessage create(final byte[] data, final Map<String, String> 
metadata) {
+        final int txnr = 
Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));

Review comment:
       Recommend renaming this variable:
   ```suggestion
           final int sequenceNumber = 
Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to