exceptionfactory commented on a change in pull request #5669: URL: https://github.com/apache/nifi/pull/5669#discussion_r788035213
########## File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processors.beats.frame.BeatsMetadata; +import org.apache.nifi.processors.beats.frame.BeatsDecoder; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.frame.BeatsFrame; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +/** + * Decode a Beats message's bytes into a BeatsMessage object + */ +public class BeatsFrameDecoder extends ByteToMessageDecoder { + + private Charset charset; + private BeatsDecoder decoder; + private final ComponentLog logger; + private final BeatsEncoder encoder; + private final BeatsMessageFactory messageFactory; + + public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a; Review comment: Recommend declaring these as separate variables following standard convetions. ########## File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processors.beats.frame.BeatsMetadata; +import org.apache.nifi.processors.beats.frame.BeatsDecoder; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.frame.BeatsFrame; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +/** + * Decode a Beats message's bytes into a BeatsMessage object + */ +public class BeatsFrameDecoder extends ByteToMessageDecoder { + + private Charset charset; + private BeatsDecoder decoder; + private final ComponentLog logger; + private final BeatsEncoder encoder; + private final BeatsMessageFactory messageFactory; + + public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a; + + public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) { + this.charset = charset; + this.logger = logger; + this.encoder = new BeatsEncoder(); + this.messageFactory = new BeatsMessageFactory(); + } + + @Override + protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception { + final int total = in.readableBytes(); + final String senderSocket = ctx.channel().remoteAddress().toString(); + this.decoder = new BeatsDecoder(charset, logger); + + for (int i = 0; i < total; i++) { + byte currByte = in.readByte(); + + // decode the bytes and once we find the end of a frame, handle the frame + if (decoder.process(currByte)) { + + final List<BeatsFrame> frames = decoder.getFrames(); + + for (BeatsFrame frame : frames) { + logger.debug("Received Beats frame with transaction {} and frame type {}", + frame.getSeqNumber(), frame.getFrameType()); Review comment: Recommend including the sender in the debug log: ```suggestion logger.debug("Received Beats Frame Sender [{}] Transaction [{}] Frame Type [{}]", senderSocket, frame.getSeqNumber(), frame.getFrameType()); ``` ########## File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processors.beats.frame.BeatsMetadata; +import org.apache.nifi.processors.beats.frame.BeatsDecoder; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.frame.BeatsFrame; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +/** + * Decode a Beats message's bytes into a BeatsMessage object + */ +public class BeatsFrameDecoder extends ByteToMessageDecoder { + + private Charset charset; + private BeatsDecoder decoder; + private final ComponentLog logger; + private final BeatsEncoder encoder; + private final BeatsMessageFactory messageFactory; + + public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a; + + public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) { + this.charset = charset; + this.logger = logger; + this.encoder = new BeatsEncoder(); + this.messageFactory = new BeatsMessageFactory(); + } + + @Override + protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception { + final int total = in.readableBytes(); + final String senderSocket = ctx.channel().remoteAddress().toString(); + this.decoder = new BeatsDecoder(charset, logger); + + for (int i = 0; i < total; i++) { + byte currByte = in.readByte(); + + // decode the bytes and once we find the end of a frame, handle the frame + if (decoder.process(currByte)) { + + final List<BeatsFrame> frames = decoder.getFrames(); + + for (BeatsFrame frame : frames) { + logger.debug("Received Beats frame with transaction {} and frame type {}", + frame.getSeqNumber(), frame.getFrameType()); + // Ignore the WINDOW SIZE type frames as they contain no payload. + if (frame.getFrameType() != 0x57) { Review comment: This check should use the static variable: ```suggestion if (FRAME_WINDOWSIZE != frame.getFrameType()) { ``` ########## File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.netty; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.response.BeatsChannelResponse; +import org.apache.nifi.processors.beats.response.BeatsResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; + +/** + * Decode data received into a BeatsMessage + */ [email protected] +public class BeatsMessageChannelHandler extends SimpleChannelInboundHandler<BeatsMessage> { + + private static final Logger LOGGER = LoggerFactory.getLogger(BeatsMessageChannelHandler.class); + private final BlockingQueue<BeatsMessage> events; + private final BeatsEncoder encoder; + + public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events) { + this.events = events; + this.encoder = new BeatsEncoder(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) { + LOGGER.debug("Beats Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender()); + if (events.offer(msg)) { + LOGGER.debug("Event Queued: Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber()); + ctx.writeAndFlush(Unpooled.wrappedBuffer(new BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber())).toByteArray())); + } else { + LOGGER.debug("Beats Queue Full: Failed Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber()); + // TODO: Not sure if there's a way to respond with an error in Beats protocol.. Review comment: Recommend removing this comment if it cannot be implemented. ########## File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processors.beats.frame.BeatsMetadata; +import org.apache.nifi.processors.beats.frame.BeatsDecoder; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.frame.BeatsFrame; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +/** + * Decode a Beats message's bytes into a BeatsMessage object + */ +public class BeatsFrameDecoder extends ByteToMessageDecoder { + + private Charset charset; + private BeatsDecoder decoder; + private final ComponentLog logger; + private final BeatsEncoder encoder; + private final BeatsMessageFactory messageFactory; + + public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a; + + public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) { + this.charset = charset; + this.logger = logger; + this.encoder = new BeatsEncoder(); + this.messageFactory = new BeatsMessageFactory(); + } + + @Override + protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception { + final int total = in.readableBytes(); + final String senderSocket = ctx.channel().remoteAddress().toString(); + this.decoder = new BeatsDecoder(charset, logger); + + for (int i = 0; i < total; i++) { + byte currByte = in.readByte(); + + // decode the bytes and once we find the end of a frame, handle the frame + if (decoder.process(currByte)) { + + final List<BeatsFrame> frames = decoder.getFrames(); + + for (BeatsFrame frame : frames) { + logger.debug("Received Beats frame with transaction {} and frame type {}", + frame.getSeqNumber(), frame.getFrameType()); + // Ignore the WINDOW SIZE type frames as they contain no payload. + if (frame.getFrameType() != 0x57) { + handle(frame, senderSocket, out); + } + } + } + } + logger.debug("Done processing buffer"); + } + + private void handle(final BeatsFrame frame, final String sender, final List<Object> out) { + final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender); + metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(frame.getSeqNumber())); + + /* If frameType is a JSON , parse the frame payload into a JsonElement so that all JSON elements but "message" + are inserted into the event metadata. + + As per above, the "message" element gets added into the body of the event + */ Review comment: This comment does not seems to reflect the behavior. It looks like the Beats Message will only be emitted when the Frame Type is JSON. Is that correct? ########## File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processors.beats.frame.BeatsMetadata; +import org.apache.nifi.processors.beats.frame.BeatsDecoder; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.frame.BeatsFrame; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +/** + * Decode a Beats message's bytes into a BeatsMessage object + */ +public class BeatsFrameDecoder extends ByteToMessageDecoder { + + private Charset charset; + private BeatsDecoder decoder; + private final ComponentLog logger; + private final BeatsEncoder encoder; + private final BeatsMessageFactory messageFactory; + + public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44, FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a; + + public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) { + this.charset = charset; + this.logger = logger; + this.encoder = new BeatsEncoder(); + this.messageFactory = new BeatsMessageFactory(); + } + + @Override + protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception { + final int total = in.readableBytes(); + final String senderSocket = ctx.channel().remoteAddress().toString(); + this.decoder = new BeatsDecoder(charset, logger); + + for (int i = 0; i < total; i++) { + byte currByte = in.readByte(); + + // decode the bytes and once we find the end of a frame, handle the frame + if (decoder.process(currByte)) { + + final List<BeatsFrame> frames = decoder.getFrames(); + + for (BeatsFrame frame : frames) { + logger.debug("Received Beats frame with transaction {} and frame type {}", + frame.getSeqNumber(), frame.getFrameType()); + // Ignore the WINDOW SIZE type frames as they contain no payload. + if (frame.getFrameType() != 0x57) { + handle(frame, senderSocket, out); + } + } + } + } + logger.debug("Done processing buffer"); Review comment: This log does not add much detail, it seems like it should be removed, or perhaps enhanced to include additional details such as the sender and total bytes. ########## File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.netty; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.response.BeatsChannelResponse; +import org.apache.nifi.processors.beats.response.BeatsResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; + +/** + * Decode data received into a BeatsMessage + */ [email protected] +public class BeatsMessageChannelHandler extends SimpleChannelInboundHandler<BeatsMessage> { + + private static final Logger LOGGER = LoggerFactory.getLogger(BeatsMessageChannelHandler.class); + private final BlockingQueue<BeatsMessage> events; + private final BeatsEncoder encoder; + + public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events) { + this.events = events; + this.encoder = new BeatsEncoder(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) { + LOGGER.debug("Beats Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender()); + if (events.offer(msg)) { + LOGGER.debug("Event Queued: Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber()); + ctx.writeAndFlush(Unpooled.wrappedBuffer(new BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber())).toByteArray())); + } else { + LOGGER.debug("Beats Queue Full: Failed Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber()); Review comment: This should be a warning instead of a debug: ```suggestion LOGGER.warn("Beats Queue Full: Failed Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber()); ``` ########## File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.netty; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.response.BeatsChannelResponse; +import org.apache.nifi.processors.beats.response.BeatsResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; + +/** + * Decode data received into a BeatsMessage + */ [email protected] +public class BeatsMessageChannelHandler extends SimpleChannelInboundHandler<BeatsMessage> { + + private static final Logger LOGGER = LoggerFactory.getLogger(BeatsMessageChannelHandler.class); Review comment: Recommend passing the ComponentLog so that logs can be associated with the Processor. ########## File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processors.beats.frame.BeatsMetadata; +import org.apache.nifi.processors.beats.frame.BeatsDecoder; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.frame.BeatsFrame; + +import java.nio.charset.Charset; +import java.util.List; +import java.util.Map; + +/** + * Decode a Beats message's bytes into a BeatsMessage object + */ +public class BeatsFrameDecoder extends ByteToMessageDecoder { + + private Charset charset; + private BeatsDecoder decoder; Review comment: Can this be refactored to a method-local variable? It seems like it could introduce thread-safety issues as currently written. ########## File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats.netty; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.response.BeatsChannelResponse; +import org.apache.nifi.processors.beats.response.BeatsResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.BlockingQueue; + +/** + * Decode data received into a BeatsMessage + */ [email protected] +public class BeatsMessageChannelHandler extends SimpleChannelInboundHandler<BeatsMessage> { + + private static final Logger LOGGER = LoggerFactory.getLogger(BeatsMessageChannelHandler.class); + private final BlockingQueue<BeatsMessage> events; + private final BeatsEncoder encoder; + + public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events) { + this.events = events; + this.encoder = new BeatsEncoder(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) { + LOGGER.debug("Beats Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender()); + if (events.offer(msg)) { + LOGGER.debug("Event Queued: Beats Message Sender [{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber()); + ctx.writeAndFlush(Unpooled.wrappedBuffer(new BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber())).toByteArray())); Review comment: This line is a bit hard to read given all of the nested. Recommend breaking it into several lines for clarity. ########## File path: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java ########## @@ -14,24 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.beats.event; +package org.apache.nifi.processors.beats.netty; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.event.NetworkEventFactory; +import org.apache.nifi.processors.beats.frame.BeatsMetadata; import java.util.Map; /** - * An EventFactory implementation to create BeatEvents. + * An EventFactory implementation to create BeatsMessages. */ -public class BeatsEventFactory implements EventFactory<BeatsEvent> { +public class BeatsMessageFactory implements NetworkEventFactory<BeatsMessage> { @Override - public BeatsEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) { - final String sender = metadata.get(EventFactory.SENDER_KEY); - final int seqNumber = Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY)); - - return new BeatsEvent(sender, data, responder, seqNumber); + public BeatsMessage create(final byte[] data, final Map<String, String> metadata) { + final int txnr = Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY)); Review comment: Recommend renaming this variable: ```suggestion final int sequenceNumber = Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
