This is an automated email from the ASF dual-hosted git repository.

thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c1a7da116 NIFI-10737 Corrected ListenBeats buffer handling
7c1a7da116 is described below

commit 7c1a7da1169f66fce490def753f9a0a228a4f75b
Author: exceptionfactory <[email protected]>
AuthorDate: Mon Oct 31 22:30:32 2022 -0500

    NIFI-10737 Corrected ListenBeats buffer handling
    
    - Added test class for ListenBeats
    - Removed unnecessary dependencies
    - Implemented BatchDecoder for reading Beats Protocol frames
    - Refactored protocol and handler classes
    
    Signed-off-by: Nathan Gough <[email protected]>
    
    This closes #6608.
---
 .../nifi-beats-processors/pom.xml                  |  46 +--
 .../apache/nifi/processors/beats/ListenBeats.java  | 180 ++++------
 .../nifi/processors/beats/frame/BeatsDecoder.java  | 328 ------------------
 .../nifi/processors/beats/frame/BeatsEncoder.java  |  47 ---
 .../nifi/processors/beats/frame/BeatsFrame.java    | 115 -------
 .../beats/handler/BatchChannelInboundHandler.java  |  83 +++++
 .../processors/beats/handler/BatchDecoder.java     | 380 +++++++++++++++++++++
 .../beats/handler/MessageAckEncoder.java           |  65 ++++
 .../processors/beats/netty/BeatsFrameDecoder.java  |  81 -----
 .../beats/netty/BeatsMessageChannelHandler.java    |  57 ----
 .../Batch.java}                                    |  21 +-
 .../BatchMessage.java}                             |  18 +-
 .../FrameType.java}                                |  29 +-
 .../FrameTypeDecoder.java}                         |  25 +-
 .../BeatsState.java => protocol/MessageAck.java}   |  20 +-
 .../ProtocolCode.java}                             |  15 +-
 .../ProtocolCodeDecoder.java}                      |  21 +-
 .../ProtocolException.java}                        |  25 +-
 .../ProtocolVersion.java}                          |  23 +-
 .../ProtocolVersionDecoder.java}                   |  25 +-
 .../beats/response/BeatsChannelResponse.java       |  42 ---
 .../processors/beats/response/BeatsResponse.java   |  62 ----
 .../BeatsMessageServerFactory.java                 |  25 +-
 .../nifi/processors/beats/ListenBeatsTest.java     | 244 +++++++++++++
 .../processors/beats/frame/TestBeatsEncoder.java   |  49 ---
 .../processors/beats/frame/TestBeatsFrame.java     |  39 ---
 26 files changed, 995 insertions(+), 1070 deletions(-)

diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml
index 91a030f4e1..1aeccb4b81 100644
--- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml
@@ -34,31 +34,18 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-event-listen</artifactId>
             <version>1.19.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-security-socket-ssl</artifactId>
-            <version>1.19.0-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-socket-utils</artifactId>
-            <version>1.19.0-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-security-socket-ssl</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-utils</artifactId>
             <version>1.19.0-SNAPSHOT</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-flowfile-packager</artifactId>
-            <version>1.19.0-SNAPSHOT</version>
-        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-ssl-context-service-api</artifactId>
@@ -71,25 +58,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-
-    <profiles>
-        <profile>
-            <!-- This profile, activating when compiling on Java versions 
above 1.8, provides configuration changes to
-                 allow NiFi to be compiled on those JDKs. -->
-            <id>jigsaw</id>
-            <activation>
-                <jdk>(1.8,)</jdk>
-            </activation>
-            <dependencies>
-                <dependency>
-                    <groupId>jakarta.xml.bind</groupId>
-                    <artifactId>jakarta.xml.bind-api</artifactId>
-                </dependency>
-                <dependency>
-                    <groupId>org.glassfish.jaxb</groupId>
-                    <artifactId>jaxb-runtime</artifactId>
-                </dependency>
-            </dependencies>
-        </profile>
-    </profiles>
 </project>
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
index 86cc2df339..4248c31dcf 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
@@ -16,20 +16,18 @@
  */
 package org.apache.nifi.processors.beats;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.event.transport.EventException;
 import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
+import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
 import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -38,14 +36,13 @@ import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.listen.EventBatcher;
 import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
 import org.apache.nifi.processor.util.listen.ListenerProperties;
-import org.apache.nifi.processors.beats.netty.BeatsMessage;
-import org.apache.nifi.processors.beats.netty.BeatsMessageServerFactory;
+import org.apache.nifi.processors.beats.protocol.BatchMessage;
+import org.apache.nifi.processors.beats.server.BeatsMessageServerFactory;
 import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.security.util.ClientAuth;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
@@ -55,11 +52,9 @@ import javax.net.ssl.SSLContext;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -67,34 +62,30 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@Tags({"listen", "beats", "tcp", "logs"})
-@CapabilityDescription("Listens for messages sent by libbeat compatible 
clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', 
writing its JSON formatted payload " +
-        "to the content of a FlowFile." +
-        "This processor replaces the now deprecated/removed ListenLumberjack")
+@Tags({"beats", "logstash", "elasticsearch", "log"})
+@CapabilityDescription("Receive messages encoded using the Elasticsearch Beats 
protocol and write decoded JSON")
 @WritesAttributes({
-    @WritesAttribute(attribute = "beats.sender", description = "The sending 
host of the messages."),
-    @WritesAttribute(attribute = "beats.port", description = "The sending port 
the messages were received over."),
-    @WritesAttribute(attribute = "beats.sequencenumber", description = "The 
sequence number of the message. Only included if <Batch Size> is 1."),
+    @WritesAttribute(attribute = "beats.sender", description = "Internet 
Protocol address of the message sender"),
+    @WritesAttribute(attribute = "beats.port", description = "TCP port on 
which the Processor received messages"),
+    @WritesAttribute(attribute = "beats.sequencenumber", description = "The 
sequence number of the message included for batches containing single 
messages"),
     @WritesAttribute(attribute = "mime.type", description = "The mime.type of 
the content which is application/json")
 })
-@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
 public class ListenBeats extends AbstractProcessor {
 
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
         .name("SSL_CONTEXT_SERVICE")
         .displayName("SSL Context Service")
-        .description("The Controller Service to use in order to obtain an SSL 
Context. If this property is set, " +
-            "messages will be received over a secure connection.")
-        // Nearly all Lumberjack v1 implementations require TLS to work. v2 
implementations (i.e. beats) have TLS as optional
+        .description("SSL Context Service is required to enable TLS for socket 
connections")
         .required(false)
         .identifiesControllerService(RestrictedSSLContextService.class)
         .build();
 
     public static final PropertyDescriptor CLIENT_AUTH = new 
PropertyDescriptor.Builder()
         .name("Client Auth")
-        .displayName("Client Auth")
-        .description("The client authentication policy to use for the SSL 
Context. Only used if an SSL Context Service is provided.")
+        .displayName("Client Authentication")
+        .description("Client authentication policy when TLS is enabled")
         .required(false)
+        .dependsOn(SSL_CONTEXT_SERVICE)
         .allowableValues(ClientAuth.values())
         .defaultValue(ClientAuth.REQUIRED.name())
         .build();
@@ -104,73 +95,43 @@ public class ListenBeats extends AbstractProcessor {
             .description("Messages received successfully will be sent out this 
relationship.")
             .build();
 
-    protected List<PropertyDescriptor> descriptors;
-    protected Set<Relationship> relationships;
+    private static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            ListenerProperties.NETWORK_INTF_NAME,
+            ListenerProperties.PORT,
+            ListenerProperties.RECV_BUFFER_SIZE,
+            ListenerProperties.MAX_MESSAGE_QUEUE_SIZE,
+            ListenerProperties.MAX_SOCKET_BUFFER_SIZE,
+            ListenerProperties.CHARSET,
+            ListenerProperties.MAX_BATCH_SIZE,
+            ListenerProperties.MESSAGE_DELIMITER,
+            ListenerProperties.WORKER_THREADS,
+            SSL_CONTEXT_SERVICE,
+            CLIENT_AUTH
+    ));
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
+
     protected volatile int port;
-    protected volatile BlockingQueue<BeatsMessage> events;
-    protected volatile BlockingQueue<BeatsMessage> errorEvents;
+    protected volatile BlockingQueue<BatchMessage> events;
+    protected volatile BlockingQueue<BatchMessage> errorEvents;
     protected volatile EventServer eventServer;
     protected volatile byte[] messageDemarcatorBytes;
-    protected volatile EventBatcher<BeatsMessage> eventBatcher;
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        final List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
-        descriptors.add(ListenerProperties.PORT);
-        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
-        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
-        // Deprecated
-        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
-        descriptors.add(ListenerProperties.CHARSET);
-        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
-        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
-        descriptors.add(ListenerProperties.WORKER_THREADS);
-        descriptors.add(SSL_CONTEXT_SERVICE);
-        descriptors.add(CLIENT_AUTH);
-        this.descriptors = Collections.unmodifiableList(descriptors);
-
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        this.relationships = Collections.unmodifiableSet(relationships);
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
-
-        final SSLContextService sslContextService = 
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-
-        if (sslContextService != null && 
!sslContextService.isTrustStoreConfigured()) {
-            results.add(new ValidationResult.Builder()
-                .explanation("SSL Context Service requires a truststore for 
the Beats forwarder client to work correctly")
-                .valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
-        }
-
-        final String clientAuth = 
validationContext.getProperty(CLIENT_AUTH).getValue();
-        if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
-            results.add(new ValidationResult.Builder()
-                    .explanation("Client Auth must be provided when using 
TLS/SSL")
-                    .valid(false).subject("Client Auth").build());
-        }
-
-        return results;
-    }
+    protected volatile EventBatcher<BatchMessage> eventBatcher;
 
     @Override
     public final Set<Relationship> getRelationships() {
-        return this.relationships;
+        return RELATIONSHIPS;
     }
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return descriptors;
+        return DESCRIPTORS;
     }
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) throws IOException {
         final int workerThreads = 
context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
-        final int bufferSize = 
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final int socketBufferSize = 
context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
         final String networkInterface = 
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
         final InetAddress address = 
NetworkUtils.getInterfaceAddress(networkInterface);
         final Charset charset = 
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
@@ -180,7 +141,7 @@ public class ListenBeats extends AbstractProcessor {
         final String msgDemarcator = getMessageDemarcator(context);
         messageDemarcatorBytes = msgDemarcator.getBytes(charset);
 
-        final NettyEventServerFactory eventFactory = new 
BeatsMessageServerFactory(getLogger(), address, port, charset, events);
+        final NettyEventServerFactory eventFactory = new 
BeatsMessageServerFactory(getLogger(), address, port, events);
 
         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         if (sslContextService != null) {
@@ -191,80 +152,75 @@ public class ListenBeats extends AbstractProcessor {
             eventFactory.setClientAuth(clientAuth);
         }
 
-        eventFactory.setSocketReceiveBuffer(bufferSize);
+        eventFactory.setSocketReceiveBuffer(socketBufferSize);
         eventFactory.setWorkerThreads(workerThreads);
         eventFactory.setThreadNamePrefix(String.format("%s[%s]", 
getClass().getSimpleName(), getIdentifier()));
+        
eventFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+        eventFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
 
         try {
             eventServer = eventFactory.getEventServer();
-        } catch (EventException e) {
+        } catch (final EventException e) {
             getLogger().error("Failed to bind to [{}:{}]", address, port, e);
         }
     }
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        EventBatcher<BeatsMessage> eventBatcher = getEventBatcher();
+        EventBatcher<BatchMessage> eventBatcher = getEventBatcher();
 
         final int batchSize = 
context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
-        Map<String, FlowFileEventBatch<BeatsMessage>> batches = 
eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+        Map<String, FlowFileEventBatch<BatchMessage>> batches = 
eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
         processEvents(session, batches);
     }
 
     @OnStopped
-    public void stopped() {
-        if (eventServer != null) {
+    public void shutdown() {
+        if (eventServer == null) {
+            getLogger().warn("Event Server not configured");
+        } else {
             eventServer.shutdown();
         }
         eventBatcher = null;
     }
 
-    private void processEvents(final ProcessSession session, final Map<String, 
FlowFileEventBatch<BeatsMessage>> batches) {
-        for (Map.Entry<String, FlowFileEventBatch<BeatsMessage>> entry : 
batches.entrySet()) {
+    private void processEvents(final ProcessSession session, final Map<String, 
FlowFileEventBatch<BatchMessage>> batches) {
+        for (final Map.Entry<String, FlowFileEventBatch<BatchMessage>> entry : 
batches.entrySet()) {
             FlowFile flowFile = entry.getValue().getFlowFile();
-            final List<BeatsMessage> events = entry.getValue().getEvents();
+            final List<BatchMessage> events = entry.getValue().getEvents();
 
             if (flowFile.getSize() == 0L || events.size() == 0) {
                 session.remove(flowFile);
-                getLogger().debug("No data written to FlowFile from batch {}; 
removing FlowFile", entry.getKey());
                 continue;
             }
 
             final Map<String,String> attributes = 
getAttributes(entry.getValue());
             flowFile = session.putAllAttributes(flowFile, attributes);
 
-            getLogger().debug("Transferring {} to success", flowFile);
             session.transfer(flowFile, REL_SUCCESS);
-            session.adjustCounter("FlowFiles Transferred to Success", 1L, 
false);
 
-            // the sender and command will be the same for all events based on 
the batch key
             final String transitUri = getTransitUri(entry.getValue());
             session.getProvenanceReporter().receive(flowFile, transitUri);
-
         }
-        session.commitAsync();
     }
 
-    protected String getTransitUri(FlowFileEventBatch<BeatsMessage> batch) {
-        final List<BeatsMessage> events = batch.getEvents();
+    private String getTransitUri(final FlowFileEventBatch<BatchMessage> batch) 
{
+        final List<BatchMessage> events = batch.getEvents();
         final String sender = events.get(0).getSender();
-        final String senderHost = sender.startsWith("/") && sender.length() > 
1 ? sender.substring(1) : sender;
-        return String.format("beats://%s:%d", senderHost, port);
+        return String.format("beats://%s:%d", sender, port);
     }
 
-    protected Map<String, String> 
getAttributes(FlowFileEventBatch<BeatsMessage> batch) {
-        final List<BeatsMessage> events = batch.getEvents();
-        // the sender and command will be the same for all events based on the 
batch key
+    private Map<String, String> getAttributes(final 
FlowFileEventBatch<BatchMessage> batch) {
+        final List<BatchMessage> events = batch.getEvents();
+
         final String sender = events.get(0).getSender();
-        final int numAttributes = events.size() == 1 ? 5 : 4;
-        final Map<String, String> attributes = new HashMap<>(numAttributes);
-        attributes.put(beatsAttributes.SENDER.key(), sender);
-        attributes.put(beatsAttributes.PORT.key(), String.valueOf(port));
+        final Map<String, String> attributes = new LinkedHashMap<>();
+        attributes.put(BeatsAttributes.SENDER.key(), sender);
+        attributes.put(BeatsAttributes.PORT.key(), String.valueOf(port));
         attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
-        // if there was only one event then we can pass on the transaction
-        // NOTE: we could pass on all the transaction ids joined together
+
         if (events.size() == 1) {
-            attributes.put(beatsAttributes.SEQNUMBER.key(), 
String.valueOf(events.get(0).getSeqNumber()));
+            attributes.put(BeatsAttributes.SEQUENCE_NUMBER.key(), 
String.valueOf(events.get(0).getSequenceNumber()));
         }
         return attributes;
     }
@@ -275,11 +231,11 @@ public class ListenBeats extends AbstractProcessor {
                 .replace("\\n", "\n").replace("\\r", "\r").replace("\\t", 
"\t");
     }
 
-    private EventBatcher<BeatsMessage> getEventBatcher() {
+    private EventBatcher<BatchMessage> getEventBatcher() {
         if (eventBatcher == null) {
-            eventBatcher = new EventBatcher<BeatsMessage>(getLogger(), events, 
errorEvents) {
+            eventBatcher = new EventBatcher<BatchMessage>(getLogger(), events, 
errorEvents) {
                 @Override
-                protected String getBatchKey(BeatsMessage event) {
+                protected String getBatchKey(final BatchMessage event) {
                     return event.getSender();
                 }
             };
@@ -287,14 +243,14 @@ public class ListenBeats extends AbstractProcessor {
         return eventBatcher;
     }
 
-    public enum beatsAttributes implements FlowFileAttributeKey {
+    private enum BeatsAttributes implements FlowFileAttributeKey {
         SENDER("beats.sender"),
         PORT("beats.port"),
-        SEQNUMBER("beats.sequencenumber");
+        SEQUENCE_NUMBER("beats.sequencenumber");
 
         private final String key;
 
-        beatsAttributes(String key) {
+        BeatsAttributes(String key) {
             this.key = key;
         }
 
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
deleted file mode 100644
index 2fa20ca856..0000000000
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-import org.apache.nifi.logging.ComponentLog;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.zip.InflaterInputStream;
-
-/**
- * Decodes a Beats frame by maintaining a state based on each byte that has 
been processed. This class
- * should not be shared by multiple threads.
- */
-public class BeatsDecoder {
-
-
-    final ComponentLog logger;
-
-    private BeatsFrame.Builder frameBuilder;
-    private BeatsState currState = BeatsState.VERSION;
-    private byte decodedFrameType;
-
-    private byte[] unprocessedData;
-
-    private final Charset charset;
-    private final ByteArrayOutputStream currBytes;
-
-    private long windowSize;
-
-    static final int MIN_FRAME_HEADER_LENGTH = 2; // Version + Type
-    static final int WINDOWSIZE_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32bit 
unsigned window size
-    static final int COMPRESSED_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 
32 bit unsigned + payload
-    static final int JSON_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 8; // 32 bit 
unsigned sequence number + 32 bit unsigned payload length
-
-    /**
-     * @param charset the charset to decode bytes from the frame
-     */
-    public BeatsDecoder(final Charset charset, final ComponentLog logger) {
-        this(charset, new ByteArrayOutputStream(4096), logger);
-    }
-
-    /**
-     * @param charset the charset to decode bytes from the frame
-     * @param buffer  a buffer to use while processing the bytes
-     */
-    public BeatsDecoder(final Charset charset, final ByteArrayOutputStream 
buffer, final ComponentLog logger) {
-        this.logger = logger;
-        this.charset = charset;
-        this.currBytes = buffer;
-        this.frameBuilder = new BeatsFrame.Builder();
-        this.decodedFrameType = 0x00;
-    }
-
-    /**
-     * Resets this decoder back to its initial state.
-     */
-    public void reset() {
-        frameBuilder = new BeatsFrame.Builder();
-        currState = BeatsState.VERSION;
-        decodedFrameType = 0x00;
-        currBytes.reset();
-    }
-
-    /**
-     * Process the next byte from the channel, updating the builder and state 
accordingly.
-     *
-     * @param currByte the next byte to process
-     * @preturn true if a frame is ready to be retrieved, false otherwise
-     */
-    public boolean process(final byte currByte) throws BeatsFrameException {
-        try {
-            switch (currState) {
-                case VERSION: // Just enough data to process the version
-                    processVERSION(currByte);
-                    break;
-                case FRAMETYPE: // Also able to process the frametype
-                    processFRAMETYPE(currByte);
-                    break;
-                case PAYLOAD: // Initial bytes with version and Frame Type 
have already been received, start iteration over payload
-                    processPAYLOAD(currByte);
-
-                    // At one stage, the data sent to processPAYLOAD will be 
represente a complete frame, so we check before returning true
-
-                    if (frameBuilder.frameType == BeatsFrameType.WINDOWSIZE && 
currState == BeatsState.COMPLETE) {
-                        return true;
-                    } else if (frameBuilder.frameType == 
BeatsFrameType.COMPRESSED && currState == BeatsState.COMPLETE) {
-                        return true;
-                    } else if (frameBuilder.frameType == BeatsFrameType.JSON 
&& currState == BeatsState.COMPLETE) {
-                        return true;
-                    } else {
-                        break;
-                    }
-                case COMPLETE:
-                    return true;
-                default:
-                    break;
-            }
-            return false;
-        } catch (Exception e) {
-            throw new BeatsFrameException("Error decoding Beats frame: " + 
e.getMessage(), e);
-        }
-    }
-
-
-    /**
-     * Returns the decoded frame and resets the decoder for the next frame.
-     * This method should be called after checking isComplete().
-     *
-     * @return the BeatsFrame that was decoded
-     */
-    public List<BeatsFrame> getFrames() throws BeatsFrameException {
-        List<BeatsFrame> frames = new LinkedList<>();
-
-        if (currState != BeatsState.COMPLETE) {
-            throw new BeatsFrameException("Must be at the trailer of a frame");
-        }
-        try {
-            // Once compressed frames are expanded, they must be devided into 
individual frames
-            if (currState == BeatsState.COMPLETE && frameBuilder.frameType == 
BeatsFrameType.COMPRESSED) {
-                logger.debug("Frame is compressed, will iterate to decode", 
new Object[]{});
-
-                // Zero currBytes, currState and frameBuilder prior to 
iteration over
-                // decompressed bytes
-                currBytes.reset();
-                frameBuilder.reset();
-                currState = BeatsState.VERSION;
-
-                // Run over decompressed data and split frames
-                frames = splitCompressedFrames(unprocessedData);
-
-            // In case of V or wired D and J frames we just ship them across 
the List
-            } else {
-                final BeatsFrame frame = frameBuilder.build();
-                currBytes.reset();
-                frameBuilder.reset();
-                currState = BeatsState.VERSION;
-                frames.add(frame);
-            }
-            return frames;
-
-        } catch (Exception e) {
-            throw new BeatsFrameException("Error decoding Beats frame: " + 
e.getMessage(), e);
-        }
-    }
-
-    private List<BeatsFrame> splitCompressedFrames(byte[] decompressedData) {
-        List<BeatsFrame> frames = new LinkedList<>();
-        BeatsFrame.Builder internalFrameBuilder = new BeatsFrame.Builder();
-        ByteBuffer currentData = ByteBuffer.wrap(decompressedData);
-
-        // Both Lumberjack v1 and Beats (LJ v2) has a weird approach to 
frames, where compressed frames embed D(ata) or J(SON) frames.
-        // inside a compressed input.
-        //  Or as stated in the documentation:
-        //
-        // "As an example, you could have 3 data frames compressed into a 
single
-        // 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}"
-        //
-        // Therefore, instead of calling process method again, just iterate 
over each of
-        // the frames and split them so they can be processed
-
-        while (currentData.hasRemaining()) {
-
-            int payloadLength = 0;
-
-            internalFrameBuilder.version = currentData.get();
-            internalFrameBuilder.frameType = currentData.get();
-            switch (internalFrameBuilder.frameType) {
-                case BeatsFrameType.JSON:
-
-                    internalFrameBuilder.seqNumber = (int) 
(currentData.getInt() & 0x00000000ffffffffL);
-                    currentData.mark();
-
-                    internalFrameBuilder.dataSize = currentData.getInt() & 
0x00000000ffffffffL;
-                    currentData.mark();
-
-                    // Define how much data to chomp
-                    payloadLength = 
Math.toIntExact(internalFrameBuilder.dataSize);
-                    byte[] jsonBytes = new byte[payloadLength];
-
-                    currentData.get(jsonBytes, 0, payloadLength);
-                    currentData.mark();
-
-                    // Add payload to frame
-                    internalFrameBuilder.payload(jsonBytes);
-                    break;
-            }
-
-            // data frame is created
-            BeatsFrame frame = internalFrameBuilder.build();
-            frames.add(frame);
-            internalFrameBuilder.reset();
-        }
-
-        return frames;
-    }
-
-
-    private void processVERSION(final byte b) {
-        byte version = b;
-        frameBuilder.version(version);
-        logger.debug("Version number is {}", new Object[]{version});
-        currBytes.write(b);
-        currState = BeatsState.FRAMETYPE;
-    }
-
-    private void processFRAMETYPE(final byte b) {
-        decodedFrameType = b;
-        frameBuilder.frameType(decodedFrameType);
-        logger.debug("Frame type is {}", new Object[]{decodedFrameType});
-        currBytes.write(b);
-        currState = BeatsState.PAYLOAD;
-    }
-
-
-    /** Process the outer PAYLOAD byte by byte. Once data is read state is set 
to COMPLETE so that the data payload
-     * can be processed fully using {@link #splitCompressedFrames(byte[])}
-     * */
-    private void processPAYLOAD(final byte b) {
-        currBytes.write(b);
-        switch (decodedFrameType) {
-            case BeatsFrameType.WINDOWSIZE: //'W'
-                if (currBytes.size() < WINDOWSIZE_LENGTH ) {
-                    logger.trace("Beats currBytes contents are {}", new 
Object[] {currBytes.toString()});
-                    break;
-                } else if (currBytes.size() == WINDOWSIZE_LENGTH) {
-                    frameBuilder.dataSize = 
ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 
6)).getInt() & 0x00000000ffffffffL;
-                    logger.debug("Data size is {}", new 
Object[]{frameBuilder.dataSize});
-                    // Sets payload  to empty as frame contains no data
-                    frameBuilder.payload(new byte[]{});
-                    currBytes.reset();
-                    currState = BeatsState.COMPLETE;
-                    windowSize = frameBuilder.dataSize;
-                    break;
-                } else { // Should never be here to be honest...
-                    logger.debug("Saw a packet I should not have seen. Packet 
contents were {}", new Object[] {currBytes.toString()});
-                    break;
-                }
-            case BeatsFrameType.COMPRESSED: //'C'
-                if (currBytes.size() < COMPRESSED_MIN_LENGTH) {
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Beats currBytes contents are {}", new 
Object[] {currBytes.toString()});
-                    }
-                    break;
-                } else if (currBytes.size() >= COMPRESSED_MIN_LENGTH) {
-                    // If data contains more thant the minimum data size
-                    frameBuilder.dataSize = 
ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 
6)).getInt() & 0x00000000ffffffffL;
-                    if (currBytes.size() - 6 == frameBuilder.dataSize) {
-                        try {
-                            byte[] buf = 
java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, currBytes.size());
-                            InputStream in = new InflaterInputStream(new 
ByteArrayInputStream(buf));
-                            ByteArrayOutputStream out = new 
ByteArrayOutputStream();
-                            byte[] buffer = new byte[1024];
-                            int len;
-                            while ((len = in.read(buffer)) > 0) {
-                                out.write(buffer, 0, len);
-                            }
-                            in.close();
-                            out.close();
-                            unprocessedData = out.toByteArray();
-                            // buf is no longer needed
-                            buf = null;
-                            logger.debug("Finished decompressing data");
-                            // Decompression is complete, we should be able to 
proceed with resetting currBytes and curSrtate and iterating them
-                            // as type 'D' frames
-                            frameBuilder.dataSize(unprocessedData.length);
-                            currState = BeatsState.COMPLETE;
-
-                        } catch (IOException e) {
-                            throw new BeatsFrameException("Error decompressing 
 frame: " + e.getMessage(), e);
-                        }
-
-                    }
-                    break;
-                    // If currentByte.size is not lower than six and also not 
equal or great than 6...
-                } else { // Should never be here to be honest...
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Received a compressed frame with partial 
data or invalid content. The packet contents were {}", new Object[] 
{currBytes.toString()});
-                    }
-                    break;
-                }
-            case BeatsFrameType.JSON: // 'J́'
-                // Because Beats can disable compression, sometimes, JSON data 
will be received outside a compressed
-                // stream (i.e. 0x43). Instead of processing it here, we defer 
its processing to went getFrames is
-                // called
-                if (currBytes.size() < JSON_MIN_LENGTH) {
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Beats currBytes contents are {}", new 
Object[] {currBytes.toString()});
-                    }
-                    break;
-                } else if (currBytes.size() == JSON_MIN_LENGTH) {
-                    // Read the sequence number from bytes
-                    frameBuilder.seqNumber = (int) 
(ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2, 
6)).getInt() & 0x00000000ffffffffL);
-                    // Read the JSON payload length
-                    frameBuilder.dataSize = 
ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, 
10)).getInt() & 0x00000000ffffffffL;
-                } else if (currBytes.size() > JSON_MIN_LENGTH) {
-                    // Wait for payload to be fully read and then complete 
processing
-                    if (currBytes.size() - 10 == frameBuilder.dataSize) {
-                        // Transfer the current payload so it can be processed 
by {@link #splitCompressedFrames} method.
-                        frameBuilder.payload = 
java.util.Arrays.copyOfRange(currBytes.toByteArray(), 10, currBytes.size());
-                        currState = BeatsState.COMPLETE;
-                    }
-                    break;
-                }
-        }
-    }
-
-}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java
deleted file mode 100644
index 8463d48ed2..0000000000
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-/**
- * Encodes a BeatsFrame into raw bytes using the given charset.
- */
-public class BeatsEncoder {
-
-
-    public byte[] encode(final BeatsFrame frame) {
-        final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-
-        // Writes the version
-        buffer.write(frame.getVersion());
-
-        // Writes the frameType
-        buffer.write(frame.getFrameType());
-
-        // Writes the sequence number
-        try {
-            buffer.write(frame.getPayload());
-        } catch (IOException e) {
-            throw new BeatsFrameException("Error decoding Beats frame: " + 
e.getMessage(), e);
-        }
-
-        return buffer.toByteArray();
-    }
-
-}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java
deleted file mode 100644
index ccb3bba366..0000000000
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-
-/**
- * A frame received from a channel.
- */
-public class BeatsFrame {
-
-    public static final byte DELIMITER = 10;
-
-    private final byte version;
-    private final byte frameType;
-    private final byte[] payload;
-    private final long dataSize;
-    private final long seqNumber;
-
-    private BeatsFrame(final Builder builder) {
-        this.version = builder.version;
-        this.frameType = builder.frameType;
-        this.payload = builder.payload;
-        this.dataSize = builder.dataSize;
-        this.seqNumber = builder.seqNumber;
-
-        if (version < 2 ||  payload.length < 0 ) {
-            throw new BeatsFrameException("Invalid Frame");
-        }
-    }
-
-    public long getSeqNumber() {
-        return seqNumber;
-    }
-
-    public byte getVersion() {
-        return version;
-    }
-
-    public byte getFrameType() {
-        return frameType;
-    }
-
-    public byte [] getPayload() {
-        return payload;
-    }
-
-    /**
-     * Builder for a BeatsFrame.
-     */
-    public static class Builder {
-
-        byte version;
-        byte frameType;
-        byte [] payload;
-        long dataSize;
-        int seqNumber;
-
-        public Builder() {
-            reset();
-        }
-
-        public void reset() {
-            version = -1;
-            seqNumber = -1;
-            frameType = -1;
-            payload = null;
-        }
-
-        public Builder version(final byte version) {
-            this.version = version;
-            return this;
-        }
-
-        public Builder seqNumber(final int seqNumber) {
-            this.seqNumber = seqNumber;
-            return this;
-        }
-
-        public Builder frameType(final byte frameType) {
-            this.frameType = frameType;
-            return this;
-        }
-
-        public Builder dataSize(final long dataSize) {
-            this.dataSize = dataSize;
-            return this;
-        }
-
-        public Builder payload(final byte [] payload) {
-            this.payload = payload;
-            return this;
-        }
-
-
-        public BeatsFrame build() {
-            return new BeatsFrame(this);
-        }
-
-    }
-
-}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java
new file mode 100644
index 0000000000..c620456431
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.beats.protocol.Batch;
+import org.apache.nifi.processors.beats.protocol.BatchMessage;
+import org.apache.nifi.processors.beats.protocol.MessageAck;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Batch Channel Inbound Handler processes a batch of messages and sends an 
acknowledgement for the last sequence number
+ */
[email protected]
+public class BatchChannelInboundHandler extends 
SimpleChannelInboundHandler<Batch> {
+    private final ComponentLog log;
+
+    private final BlockingQueue<BatchMessage> messages;
+
+    /**
+     * Batch Channel Inbound Handler with required arguments
+     *
+     * @param log Processor Log
+     * @param messages Queue of messages
+     */
+    public BatchChannelInboundHandler(final ComponentLog log, final 
BlockingQueue<BatchMessage> messages) {
+        this.log = Objects.requireNonNull(log, "Component Log required");
+        this.messages = Objects.requireNonNull(messages, "Message Queue 
required");
+    }
+
+    /**
+     * Channel Read processes a batch of messages and sends an acknowledgement 
for the last sequence number
+     *
+     * @param context Channel Handler Context
+     * @param batch Batch of messages
+     */
+    @Override
+    protected void channelRead0(final ChannelHandlerContext context, final 
Batch batch) {
+        Integer lastSequenceNumber = null;
+
+        final Collection<BatchMessage> batchMessages = batch.getMessages();
+        int queued = 0;
+        for (final BatchMessage batchMessage : batchMessages) {
+            final int sequenceNumber = batchMessage.getSequenceNumber();
+            final String sender = batchMessage.getSender();
+            if (messages.offer(batchMessage)) {
+                log.debug("Message Sequence Number [{}] Sender [{}] queued", 
sequenceNumber, sender);
+                lastSequenceNumber = batchMessage.getSequenceNumber();
+                queued++;
+            } else {
+                log.warn("Message Sequence Number [{}] Sender [{}] queuing 
failed: Queued [{}] of [{}]", sequenceNumber, sender, queued, 
batchMessages.size());
+                break;
+            }
+        }
+
+        if (lastSequenceNumber == null) {
+            log.warn("Batch Messages [{}] queuing failed", 
batch.getMessages().size());
+        } else {
+            final MessageAck messageAck = new MessageAck(lastSequenceNumber);
+            context.writeAndFlush(messageAck);
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java
new file mode 100644
index 0000000000..4aec512c22
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.beats.protocol.Batch;
+import org.apache.nifi.processors.beats.protocol.BatchMessage;
+import org.apache.nifi.processors.beats.protocol.FrameType;
+import org.apache.nifi.processors.beats.protocol.FrameTypeDecoder;
+import org.apache.nifi.processors.beats.protocol.ProtocolCodeDecoder;
+import org.apache.nifi.processors.beats.protocol.ProtocolException;
+import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
+import org.apache.nifi.processors.beats.protocol.ProtocolVersionDecoder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterOutputStream;
+
+/**
+ * Byte Buffer to Batch Decoder parses bytes to batches of Beats messages
+ */
+public class BatchDecoder extends ByteToMessageDecoder {
+    private static final int INITIAL_WINDOW_SIZE = 1;
+
+    private static final int INITIAL_QUEUE_SIZE = 1;
+
+    private static final int CODE_READABLE_BYTES = 1;
+
+    private static final int INT_READABLE_BYTES = 4;
+
+    private static final ProtocolCodeDecoder<ProtocolVersion> VERSION_DECODER 
= new ProtocolVersionDecoder();
+
+    private static final ProtocolCodeDecoder<FrameType> FRAME_TYPE_DECODER = 
new FrameTypeDecoder();
+
+    private final ComponentLog log;
+
+    private final AtomicReference<ProtocolVersion> versionRef = new 
AtomicReference<>();
+
+    private final AtomicReference<FrameType> frameTypeRef = new 
AtomicReference<>();
+
+    private final AtomicInteger windowSize = new 
AtomicInteger(INITIAL_WINDOW_SIZE);
+
+    private final AtomicReference<Integer> sequenceNumberRef = new 
AtomicReference<>();
+
+    private final AtomicReference<Integer> payloadSizeRef = new 
AtomicReference<>();
+
+    private final AtomicReference<Integer> compressedSizeRef = new 
AtomicReference<>();
+
+    private Queue<BatchMessage> batchMessages = new 
ArrayBlockingQueue<>(INITIAL_QUEUE_SIZE);
+
+    /**
+     * Beats Batch Decoder with required arguments
+     *
+     * @param log Processor Log
+     */
+    public BatchDecoder(final ComponentLog log) {
+        this.log = Objects.requireNonNull(log, "Component Log required");
+    }
+
+    /**
+     * Decode Batch of Beats Messages from Byte Buffer
+     *
+     * @param context Channel Handler Context
+     * @param buffer Byte Buffer
+     * @param objects List of Batch objects
+     */
+    @Override
+    protected void decode(final ChannelHandlerContext context, final ByteBuf 
buffer, final List<Object> objects) {
+        final ProtocolVersion protocolVersion = readVersion(buffer);
+        if (ProtocolVersion.VERSION_2 == protocolVersion) {
+            final FrameType frameType = readFrameType(buffer);
+            decodeFrameType(frameType, context, buffer, objects);
+        } else if (ProtocolVersion.VERSION_1 == protocolVersion) {
+            throw new ProtocolException("Protocol Version [1] not supported");
+        }
+    }
+
+    private void decodeFrameType(final FrameType frameType, final 
ChannelHandlerContext context, final ByteBuf buffer, final List<Object> 
batches) {
+        if (frameType == null) {
+            log.trace("Frame Type not found");
+        } else if (FrameType.COMPRESSED == frameType) {
+            processCompressed(context, buffer, batches);
+        } else if (FrameType.WINDOW_SIZE == frameType) {
+            processWindowSize(context, buffer);
+        } else if (FrameType.JSON == frameType) {
+            processJson(context, buffer, batches);
+        } else {
+            final String message = String.format("Frame Type [%s] not 
supported", frameType);
+            throw new ProtocolException(message);
+        }
+    }
+
+    private void processWindowSize(final ChannelHandlerContext context, final 
ByteBuf buffer) {
+        final Integer readWindowSize = readUnsignedInteger(buffer);
+        if (readWindowSize == null) {
+            log.trace("State [Read Window Size] not enough readable bytes");
+        } else {
+            windowSize.getAndSet(readWindowSize);
+            batchMessages = new ArrayBlockingQueue<>(readWindowSize);
+
+            resetFrameTypeVersion();
+            final Channel channel = context.channel();
+            log.debug("Processed Window Size [{}] Local [{}] Remote [{}]", 
readWindowSize, channel.localAddress(), channel.remoteAddress());
+        }
+    }
+
+    private void processCompressed(final ChannelHandlerContext context, final 
ByteBuf buffer, final List<Object> batches) {
+        final Integer readCompressedSize = readCompressedSize(buffer);
+        if (readCompressedSize == null) {
+            log.trace("State [Read Compressed] not enough readable bytes");
+        } else {
+            final int readableBytes = buffer.readableBytes();
+            if (readableBytes >= readCompressedSize) {
+                final Channel channel = context.channel();
+                log.debug("Processing Compressed Size [{}] Local [{}] Remote 
[{}]", readCompressedSize, channel.localAddress(), channel.remoteAddress());
+
+                processCompressed(context, buffer, readCompressedSize, 
batches);
+            } else {
+                log.trace("State [Read Compressed] not enough readable bytes 
[{}] for compressed [{}]", readableBytes, readCompressedSize);
+            }
+        }
+    }
+
+    private void processCompressed(
+            final ChannelHandlerContext context,
+            final ByteBuf buffer,
+            final int compressedSize,
+            final List<Object> batches
+    ) {
+        final ByteBuf inflated = context.alloc().buffer(compressedSize);
+        try {
+            readCompressedBuffer(buffer, inflated, compressedSize);
+
+            // Clear status prior to decoding inflated frames
+            resetSequenceVersionPayloadSize();
+            resetFrameTypeVersion();
+
+            while (inflated.isReadable()) {
+                decode(context, inflated, batches);
+            }
+        } finally {
+            compressedSizeRef.set(null);
+            inflated.release();
+        }
+    }
+
+    private void processJson(final ChannelHandlerContext context, final 
ByteBuf buffer, final List<Object> batches) {
+        final Channel channel = context.channel();
+
+        final Integer sequenceNumber = readSequenceNumber(buffer);
+        if (sequenceNumber == null) {
+            log.trace("State [Read JSON] Sequence Number not found Remote 
[{}]", channel.remoteAddress());
+        } else {
+            final Integer payloadSize = readPayloadSize(buffer);
+            if (payloadSize == null) {
+                log.trace("State [Read JSON] Payload Size not found Remote 
[{}]", channel.remoteAddress());
+            } else {
+                processJson(sequenceNumber, payloadSize, context, buffer, 
batches);
+            }
+        }
+    }
+
+    private void processJson(
+            final int sequenceNumber,
+            final int payloadSize,
+            final ChannelHandlerContext context,
+            final ByteBuf buffer,
+            final List<Object> batches
+    ) {
+        final Channel channel = context.channel();
+
+        final BatchMessage batchMessage = readJsonMessage(context, 
sequenceNumber, payloadSize, buffer);
+        if (batchMessage == null) {
+            log.trace("State [Read JSON] Message not found Remote [{}]", 
channel.remoteAddress());
+        } else {
+            processBatchMessage(batchMessage, batches);
+            log.debug("Processed JSON Message Sequence Number [{}] Payload 
Size [{}] Local [{}] Remote [{}]", sequenceNumber, payloadSize, 
channel.localAddress(), channel.remoteAddress());
+        }
+    }
+
+    private BatchMessage readJsonMessage(
+            final ChannelHandlerContext context,
+            final int sequenceNumber,
+            final int payloadSize,
+            final ByteBuf buffer
+    ) {
+        final BatchMessage batchMessage;
+
+        final int readableBytes = buffer.readableBytes();
+        if (readableBytes >= payloadSize) {
+            final byte[] payload = new byte[payloadSize];
+            buffer.readBytes(payload);
+
+            final Channel channel = context.channel();
+            final String sender = getRemoteHostAddress(channel);
+            batchMessage = new BatchMessage(sender, payload, sequenceNumber);
+        } else {
+            batchMessage = null;
+            log.trace("State [Read JSON] Sequence Number [{}] not enough 
readable bytes [{}] for payload [{}]", sequenceNumber, readableBytes, 
payloadSize);
+        }
+
+        return batchMessage;
+    }
+
+    private String getRemoteHostAddress(final Channel channel) {
+        final String remoteHostAddress;
+
+        final SocketAddress remoteAddress = channel.remoteAddress();
+        if (remoteAddress instanceof InetSocketAddress) {
+            final InetSocketAddress remoteSocketAddress = (InetSocketAddress) 
remoteAddress;
+            final InetAddress address = remoteSocketAddress.getAddress();
+            remoteHostAddress = address.getHostAddress();
+        } else {
+            remoteHostAddress = remoteAddress.toString();
+        }
+
+        return remoteHostAddress;
+    }
+
+    private void processBatchMessage(final BatchMessage batchMessage, final 
List<Object> batches) {
+        if (batchMessages.offer(batchMessage)) {
+            resetSequenceVersionPayloadSize();
+            resetFrameTypeVersion();
+
+            if (windowSize.get() == batchMessages.size()) {
+                final Collection<BatchMessage> messages = new 
ArrayList<>(batchMessages);
+                final Batch batch = new Batch(messages);
+                batches.add(batch);
+
+                resetWindowSize();
+            }
+        } else {
+            final String message = String.format("Received message exceeds 
Window Size [%d]", windowSize.get());
+            throw new ProtocolException(message);
+        }
+    }
+
+    private void readCompressedBuffer(final ByteBuf compressedBuffer, final 
ByteBuf inflated, final int compressedSize) {
+        final Inflater inflater = new Inflater();
+        try (
+                final ByteBufOutputStream outputStream = new 
ByteBufOutputStream(inflated);
+                final InflaterOutputStream inflaterOutputStream = new 
InflaterOutputStream(outputStream, inflater)
+        ) {
+            compressedBuffer.readBytes(inflaterOutputStream, compressedSize);
+        } catch (final IOException e) {
+            final String message = String.format("Read Compressed Payload Size 
[%d] failed", compressedSize);
+            throw new ProtocolException(message, e);
+        } finally {
+            inflater.end();
+        }
+    }
+
+    private Integer readSequenceNumber(final ByteBuf buffer) {
+        if (sequenceNumberRef.get() == null) {
+            final Integer readSequenceNumber = readUnsignedInteger(buffer);
+            if (readSequenceNumber == null) {
+                log.trace("State [Read JSON] not enough readable bytes for 
Sequence Number");
+            } else {
+                sequenceNumberRef.set(readSequenceNumber);
+            }
+        }
+
+        return sequenceNumberRef.get();
+    }
+
+    private Integer readPayloadSize(final ByteBuf buffer) {
+        if (payloadSizeRef.get() == null) {
+            final Integer readPayloadSize = readUnsignedInteger(buffer);
+            if (readPayloadSize == null) {
+                log.trace("State [Read JSON] not enough readable bytes for 
Payload Size");
+            } else {
+                payloadSizeRef.set(readPayloadSize);
+            }
+        }
+
+        return payloadSizeRef.get();
+    }
+
+    private Integer readCompressedSize(final ByteBuf buffer) {
+        if (compressedSizeRef.get() == null) {
+            final Integer readCompressedSize = readUnsignedInteger(buffer);
+            if (readCompressedSize == null) {
+                log.trace("State [Read Compressed] not enough readable bytes 
for Compressed Size");
+            } else {
+                compressedSizeRef.set(readCompressedSize);
+            }
+        }
+
+        return compressedSizeRef.get();
+    }
+
+    private Integer readUnsignedInteger(final ByteBuf buffer) {
+        final Integer number;
+
+        final int readableBytes = buffer.readableBytes();
+        if (readableBytes >= INT_READABLE_BYTES) {
+            final long unsigned = buffer.readUnsignedInt();
+            number = Math.toIntExact(unsigned);
+        } else {
+            number = null;
+        }
+
+        return number;
+    }
+
+    private FrameType readFrameType(final ByteBuf buffer) {
+        if (frameTypeRef.get() == null) {
+            final int readableBytes = buffer.readableBytes();
+            if (readableBytes >= CODE_READABLE_BYTES) {
+                final byte frameTypeCode = buffer.readByte();
+                final FrameType frameType = 
FRAME_TYPE_DECODER.readProtocolCode(frameTypeCode);
+                frameTypeRef.set(frameType);
+            } else {
+                log.trace("State [Read Frame Type] not enough readable bytes 
[{}]", readableBytes);
+            }
+        }
+
+        return frameTypeRef.get();
+    }
+
+    private ProtocolVersion readVersion(final ByteBuf buffer) {
+        if (versionRef.get() == null) {
+            final int readableBytes = buffer.readableBytes();
+            if (readableBytes >= CODE_READABLE_BYTES) {
+                final byte versionCode = buffer.readByte();
+                final ProtocolVersion protocolVersion = 
VERSION_DECODER.readProtocolCode(versionCode);
+                versionRef.set(protocolVersion);
+            } else {
+                log.trace("State [Read Version] not enough readable bytes 
[{}]", readableBytes);
+            }
+        }
+
+        return versionRef.get();
+    }
+
+    private void resetSequenceVersionPayloadSize() {
+        sequenceNumberRef.set(null);
+        payloadSizeRef.set(null);
+    }
+
+    private void resetFrameTypeVersion() {
+        frameTypeRef.set(null);
+        versionRef.set(null);
+    }
+
+    private void resetWindowSize() {
+        windowSize.set(INITIAL_WINDOW_SIZE);
+        batchMessages.clear();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/MessageAckEncoder.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/MessageAckEncoder.java
new file mode 100644
index 0000000000..f455207791
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/MessageAckEncoder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.beats.protocol.FrameType;
+import org.apache.nifi.processors.beats.protocol.MessageAck;
+import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
+
+import java.util.Objects;
+
+/**
+ * Beats Message Acknowledgement Encoder writes Protocol Version 2 ACK packets 
with a specified sequence number
+ */
[email protected]
+public class MessageAckEncoder extends MessageToByteEncoder<MessageAck> {
+    private final ComponentLog log;
+
+    /**
+     * Message Acknowledgment Encoder with required arguments
+     *
+     * @param log Processor Log
+     */
+    public MessageAckEncoder(final ComponentLog log) {
+        this.log = Objects.requireNonNull(log, "Component Log required");
+    }
+
+    /**
+     * Encode Message Acknowledgement to the buffer with Protocol Version 2 
and ACK Frame Type
+     *
+     * @param context Channel Handler Context
+     * @param messageAck Message Acknowledgement containing Sequence Number
+     * @param buffer Byte Buffer
+     */
+    @Override
+    protected void encode(final ChannelHandlerContext context, final 
MessageAck messageAck, final ByteBuf buffer) {
+        buffer.writeByte(ProtocolVersion.VERSION_2.getCode());
+        buffer.writeByte(FrameType.ACK.getCode());
+
+        final int sequenceNumber = messageAck.getSequenceNumber();
+        buffer.writeInt(sequenceNumber);
+
+        final Channel channel = context.channel();
+        log.debug("Encoded Message Ack Sequence Number [{}] Local [{}] Remote 
[{}]", sequenceNumber, channel.localAddress(), channel.remoteAddress());
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
deleted file mode 100644
index e870660302..0000000000
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.netty;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
-import org.apache.nifi.processors.beats.frame.BeatsDecoder;
-import org.apache.nifi.processors.beats.frame.BeatsFrame;
-import org.apache.nifi.processors.beats.frame.BeatsFrameType;
-import org.apache.nifi.processors.beats.frame.BeatsMetadata;
-
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Decode a Beats message's bytes into a BeatsMessage object
- */
-public class BeatsFrameDecoder extends ByteToMessageDecoder {
-
-    private final Charset charset;
-    private final ComponentLog logger;
-    private final BeatsMessageFactory messageFactory;
-
-    public BeatsFrameDecoder(final ComponentLog logger, final Charset charset) 
{
-        this.charset = charset;
-        this.logger = logger;
-        this.messageFactory = new BeatsMessageFactory();
-    }
-
-    @Override
-    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, 
final List<Object> out) {
-        final int total = in.readableBytes();
-        final String senderSocket = ctx.channel().remoteAddress().toString();
-        final BeatsDecoder decoder = new BeatsDecoder(charset, logger);
-
-        for (int i = 0; i < total; i++) {
-            byte currByte = in.readByte();
-
-            // decode the bytes and once we find the end of a frame, handle 
the frame
-            if (decoder.process(currByte)) {
-                final List<BeatsFrame> frames = decoder.getFrames();
-                for (BeatsFrame frame : frames) {
-                    logger.debug("Received Beats Frame Sender [{}] Transaction 
[{}] Frame Type [{}]",
-                            senderSocket, frame.getSeqNumber(), 
frame.getFrameType());
-                    // Ignore the WINDOW SIZE type frames as they contain no 
payload.
-                    if (frame.getFrameType() != BeatsFrameType.WINDOWSIZE) {
-                        handle(frame, senderSocket, out);
-                    }
-                }
-            }
-        }
-    }
-
-    private void handle(final BeatsFrame frame, final String sender, final 
List<Object> out) {
-        final Map<String, String> metadata = 
EventFactoryUtil.createMapWithSender(sender);
-        metadata.put(BeatsMetadata.SEQNUMBER_KEY, 
String.valueOf(frame.getSeqNumber()));
-
-        if (frame.getFrameType() == BeatsFrameType.JSON) {
-            final BeatsMessage event = 
messageFactory.create(frame.getPayload(), metadata);
-            out.add(event);
-        }
-    }
-}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
deleted file mode 100644
index 0518a12545..0000000000
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.netty;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.beats.frame.BeatsEncoder;
-import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
-import org.apache.nifi.processors.beats.response.BeatsResponse;
-
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Decode data received into a BeatsMessage
- */
[email protected]
-public class BeatsMessageChannelHandler extends 
SimpleChannelInboundHandler<BeatsMessage> {
-
-    private final ComponentLog componentLog;
-    private final BlockingQueue<BeatsMessage> events;
-    private final BeatsEncoder encoder;
-
-    public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events, 
ComponentLog componentLog) {
-        this.events = events;
-        this.componentLog = componentLog;
-        this.encoder = new BeatsEncoder();
-    }
-
-    @Override
-    protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) {
-        componentLog.debug("Beats Message Received Length [{}] Remote Address 
[{}] ", msg.getMessage().length, msg.getSender());
-        if (events.offer(msg)) {
-            componentLog.debug("Event Queued: Beats Message Sender [{}] 
Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
-            BeatsChannelResponse successResponse = new 
BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber()));
-            
ctx.writeAndFlush(Unpooled.wrappedBuffer(successResponse.toByteArray()));
-        } else {
-            componentLog.warn("Beats Queue Full: Failed Beats Message Sender 
[{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
-        }
-    }
-}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/Batch.java
similarity index 66%
copy from 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
copy to 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/Batch.java
index baa34a2350..1f24405bfb 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/Batch.java
@@ -14,19 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
+
+import java.util.Collection;
+import java.util.Objects;
 
 /**
- * Represents an error encountered when decoding frames.
+ * Batch of Beats Messages
  */
-public class BeatsFrameException extends RuntimeException {
+public class Batch {
+    private final Collection<BatchMessage> messages;
 
-    public BeatsFrameException(String message) {
-        super(message);
+    public Batch(final Collection<BatchMessage> messages) {
+        this.messages = Objects.requireNonNull(messages, "Message required");
     }
 
-    public BeatsFrameException(String message, Throwable cause) {
-        super(message, cause);
+    public Collection<BatchMessage> getMessages() {
+        return messages;
     }
-
-}
\ No newline at end of file
+}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/BatchMessage.java
similarity index 66%
rename from 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java
rename to 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/BatchMessage.java
index 70f9d4d5a2..3b1c5bb9cc 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/BatchMessage.java
@@ -14,23 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.netty;
+package org.apache.nifi.processors.beats.protocol;
 
 import org.apache.nifi.event.transport.message.ByteArrayMessage;
 
 /**
- * A Beats message which adds a sequence number to the ByteArrayMessage.
+ * Beats Batch Message containing JSON payload and sequence number
  */
-public class BeatsMessage extends ByteArrayMessage {
+public class BatchMessage extends ByteArrayMessage {
 
-    private final int seqNumber;
+    private final int sequenceNumber;
 
-    public BeatsMessage(final String sender, final byte[] data, final int 
seqNumber) {
-        super(data, sender);
-        this.seqNumber = seqNumber;
+    public BatchMessage(final String sender, final byte[] payload, final int 
sequenceNumber) {
+        super(payload, sender);
+        this.sequenceNumber = sequenceNumber;
     }
 
-    public int getSeqNumber() {
-        return seqNumber;
+    public int getSequenceNumber() {
+        return sequenceNumber;
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameType.java
similarity index 69%
copy from 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
copy to 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameType.java
index baa34a2350..640cd8c364 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameType.java
@@ -14,19 +14,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
 
 /**
- * Represents an error encountered when decoding frames.
+ * Beats Protocol Frame Type
  */
-public class BeatsFrameException extends RuntimeException {
+public enum FrameType implements ProtocolCode {
+    ACK('A'),
 
-    public BeatsFrameException(String message) {
-        super(message);
-    }
+    COMPRESSED('C'),
+
+    DATA('D'),
+
+    JSON('J'),
 
-    public BeatsFrameException(String message, Throwable cause) {
-        super(message, cause);
+    WINDOW_SIZE('W');
+
+    private final int code;
+
+    FrameType(final char code) {
+        this.code = code;
     }
 
-}
\ No newline at end of file
+    @Override
+    public int getCode() {
+        return code;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameTypeDecoder.java
similarity index 55%
copy from 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
copy to 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameTypeDecoder.java
index 73be08f7cc..3f02d9ab3d 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameTypeDecoder.java
@@ -14,22 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.netty;
+package org.apache.nifi.processors.beats.protocol;
 
-import org.apache.nifi.processor.util.listen.event.NetworkEventFactory;
-import org.apache.nifi.processors.beats.frame.BeatsMetadata;
-
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Optional;
 
 /**
- * An EventFactory implementation to create BeatsMessages.
+ * Beats Frame Type Decoder
  */
-public class BeatsMessageFactory implements NetworkEventFactory<BeatsMessage> {
+public class FrameTypeDecoder implements ProtocolCodeDecoder<FrameType> {
 
     @Override
-    public BeatsMessage create(final byte[] data, final Map<String, String> 
metadata) {
-        final int sequenceNumber = 
Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
-        final String sender = metadata.get(BeatsMetadata.SENDER_KEY);
-        return new BeatsMessage(sender, data, sequenceNumber);
+    public FrameType readProtocolCode(final byte code) {
+        final Optional<FrameType> frameTypeFound = 
Arrays.stream(FrameType.values()).filter(
+                frameType -> frameType.getCode() == code
+        ).findFirst();
+
+        return frameTypeFound.orElseThrow(() -> {
+            final String message = String.format("Frame Type Code [%d] not 
supported", code);
+            return new ProtocolException(message);
+        });
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsState.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/MessageAck.java
similarity index 71%
rename from 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsState.java
rename to 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/MessageAck.java
index b18cf8537b..2778655226 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsState.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/MessageAck.java
@@ -14,15 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
 
 /**
- * The stages of parsing of a Beats conversation.
+ * Beats Message Acknowledgement
  */
-public enum BeatsState {
+public class MessageAck {
+    private final int sequenceNumber;
 
-    VERSION, // First stage is parsing the version
-    FRAMETYPE, // Second stage is to be able to read the frame type
-    PAYLOAD, // payload being populated
-    COMPLETE // complete packet handling
-}
\ No newline at end of file
+    public MessageAck(final int sequenceNumber) {
+        this.sequenceNumber = sequenceNumber;
+    }
+
+    public int getSequenceNumber() {
+        return sequenceNumber;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCode.java
similarity index 75%
rename from 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java
rename to 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCode.java
index 2dc5a74d89..7112f5ccc1 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCode.java
@@ -14,13 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
 
 /**
- * Metadata keys for Beats message.
+ * Beats Protocol Code interface abstraction
  */
-public interface BeatsMetadata {
-
-    String SEQNUMBER_KEY = "beats.sequencenumber";
-    String SENDER_KEY = "sender";
+public interface ProtocolCode {
+    /**
+     * Get Protocol Code as transmitted over a socket connection
+     *
+     * @return Protocol Code
+     */
+    int getCode();
 }
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCodeDecoder.java
similarity index 69%
rename from 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java
rename to 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCodeDecoder.java
index 77a32728c4..a853dffdce 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCodeDecoder.java
@@ -14,12 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
 
-public final class BeatsFrameType {
-    public static final byte WINDOWSIZE = 0x57;
-    public static final byte DATA = 0x44;
-    public static final byte COMPRESSED = 0x43;
-    public static final byte ACK = 0x41;
-    public static final byte JSON = 0x4a;
+/**
+ * Decoder for Protocol Code byte values
+ *
+ * @param <T> Protocol Code Type
+ */
+public interface ProtocolCodeDecoder<T extends ProtocolCode> {
+    /**
+     * Read Protocol Code
+     *
+     * @param code Code byte value
+     * @return Protocol Code
+     */
+    T readProtocolCode(byte code);
 }
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolException.java
similarity index 58%
copy from 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
copy to 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolException.java
index baa34a2350..45d7c9cc63 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolException.java
@@ -14,19 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
 
 /**
- * Represents an error encountered when decoding frames.
+ * Beats Protocol Exception
  */
-public class BeatsFrameException extends RuntimeException {
-
-    public BeatsFrameException(String message) {
+public class ProtocolException extends RuntimeException {
+    /**
+     * Protocol Exception constructor with message containing protocol failure 
details
+     *
+     * @param message Protocol failure details
+     */
+    public ProtocolException(final String message) {
         super(message);
     }
 
-    public BeatsFrameException(String message, Throwable cause) {
+    /**
+     * Protocol Exception constructor with message and cause of failure details
+     *
+     * @param message Protocol failure details
+     * @param cause Cause of failure
+     */
+    public ProtocolException(final String message, final Throwable cause) {
         super(message, cause);
     }
-
-}
\ No newline at end of file
+}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersion.java
similarity index 71%
rename from 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
rename to 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersion.java
index baa34a2350..fc7972e725 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersion.java
@@ -14,19 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
 
 /**
- * Represents an error encountered when decoding frames.
+ * Beats Protocol Version
  */
-public class BeatsFrameException extends RuntimeException {
+public enum ProtocolVersion implements ProtocolCode {
+    VERSION_1('1'),
 
-    public BeatsFrameException(String message) {
-        super(message);
-    }
+    VERSION_2('2');
+
+    private final int code;
 
-    public BeatsFrameException(String message, Throwable cause) {
-        super(message, cause);
+    ProtocolVersion(final char code) {
+        this.code = code;
     }
 
-}
\ No newline at end of file
+    @Override
+    public int getCode() {
+        return code;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersionDecoder.java
similarity index 54%
rename from 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
rename to 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersionDecoder.java
index 73be08f7cc..3f06d382e5 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersionDecoder.java
@@ -14,22 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.netty;
+package org.apache.nifi.processors.beats.protocol;
 
-import org.apache.nifi.processor.util.listen.event.NetworkEventFactory;
-import org.apache.nifi.processors.beats.frame.BeatsMetadata;
-
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Optional;
 
 /**
- * An EventFactory implementation to create BeatsMessages.
+ * Beats Protocol Version Decoder
  */
-public class BeatsMessageFactory implements NetworkEventFactory<BeatsMessage> {
+public class ProtocolVersionDecoder implements 
ProtocolCodeDecoder<ProtocolVersion> {
 
     @Override
-    public BeatsMessage create(final byte[] data, final Map<String, String> 
metadata) {
-        final int sequenceNumber = 
Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
-        final String sender = metadata.get(BeatsMetadata.SENDER_KEY);
-        return new BeatsMessage(sender, data, sequenceNumber);
+    public ProtocolVersion readProtocolCode(final byte code) {
+        final Optional<ProtocolVersion> protocolVersionFound = 
Arrays.stream(ProtocolVersion.values()).filter(
+                protocolVersion -> protocolVersion.getCode() == code
+        ).findFirst();
+
+        return protocolVersionFound.orElseThrow(() -> {
+            final String message = String.format("Version Code [%d] not 
supported", code);
+            return new ProtocolException(message);
+        });
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java
deleted file mode 100644
index 5890771ab0..0000000000
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.response;
-
-import org.apache.nifi.processor.util.listen.response.ChannelResponse;
-import org.apache.nifi.processors.beats.frame.BeatsFrame;
-import org.apache.nifi.processors.beats.frame.BeatsEncoder;
-
-/**
- * Creates a BeatsFrame for the provided response and returns the encoded 
frame.
- */
-public class BeatsChannelResponse implements ChannelResponse {
-
-    private final BeatsEncoder encoder;
-    private final BeatsResponse response;
-
-    public BeatsChannelResponse(final BeatsEncoder encoder, final 
BeatsResponse response) {
-        this.encoder = encoder;
-        this.response = response;
-    }
-
-    @Override
-    public byte[] toByteArray() {
-        final BeatsFrame frame = response.toFrame();
-        return encoder.encode(frame);
-    }
-
-}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
deleted file mode 100644
index 7f12c9d7d1..0000000000
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.response;
-
-import org.apache.nifi.processors.beats.frame.BeatsFrame;
-
-import java.nio.ByteBuffer;
-
-/**
- 'ack' frame type
-
- SENT FROM READER ONLY
- frame type value: ASCII 'A' aka byte value 0x41
-
- Payload:
- 32bit unsigned sequence number.
-
- */
-public class BeatsResponse {
-    private final int seqNumber;
-    final private byte version = 0x32; // v2
-    final private byte frameType = 0x41; // A or ACK
-
-
-
-    public BeatsResponse(final int seqNumber) {
-        this.seqNumber = seqNumber;
-    }
-
-    /**
-     * Creates a BeatsFrame where the data portion will contain this response.
-     *
-     *
-     * @return a BeatsFrame for for this response
-     */
-    public BeatsFrame toFrame() {
-
-        return new BeatsFrame.Builder()
-                .version(version)
-                .frameType(frameType)
-                .payload(ByteBuffer.allocate(4).putInt(seqNumber).array())
-                .build();
-    }
-
-    public static BeatsResponse ok(final int seqNumber) {
-        return new BeatsResponse(seqNumber);
-    }
-}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/server/BeatsMessageServerFactory.java
similarity index 69%
rename from 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java
rename to 
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/server/BeatsMessageServerFactory.java
index 2e0414189d..35aa3c1547 100644
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/server/BeatsMessageServerFactory.java
@@ -14,43 +14,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.processors.beats.netty;
+package org.apache.nifi.processors.beats.server;
 
 import org.apache.nifi.event.transport.configuration.TransportProtocol;
 import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
 import 
org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.beats.handler.BatchChannelInboundHandler;
+import org.apache.nifi.processors.beats.handler.BatchDecoder;
+import org.apache.nifi.processors.beats.handler.MessageAckEncoder;
+import org.apache.nifi.processors.beats.protocol.BatchMessage;
 
 import java.net.InetAddress;
-import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.concurrent.BlockingQueue;
 
 /**
- * Netty Event Server Factory implementation for RELP Messages
+ * Beats Message Protocol extends of Netty Event Server Factory
  */
 public class BeatsMessageServerFactory extends NettyEventServerFactory {
-
     /**
-     * RELP Message Server Factory to receive RELP messages
+     * Beats Message Server Factory constructor with standard configuration 
arguments
+     *
      * @param log Component Log
      * @param address Server Address
      * @param port Server Port Number
-     * @param charset Charset to use when decoding RELP messages
      * @param events Blocking Queue for events received
      */
     public BeatsMessageServerFactory(final ComponentLog log,
                                      final InetAddress address,
                                      final int port,
-                                     final Charset charset,
-                                     final BlockingQueue<BeatsMessage> events) 
{
+                                     final BlockingQueue<BatchMessage> events) 
{
         super(address, port, TransportProtocol.TCP);
-        final BeatsMessageChannelHandler beatsChannelHandler = new 
BeatsMessageChannelHandler(events, log);
+        final MessageAckEncoder messageAckEncoder = new MessageAckEncoder(log);
+        final BatchChannelInboundHandler batchChannelInboundHandler = new 
BatchChannelInboundHandler(log, events);
         final LogExceptionChannelHandler logExceptionChannelHandler = new 
LogExceptionChannelHandler(log);
 
         setHandlerSupplier(() -> Arrays.asList(
-                new BeatsFrameDecoder(log, charset),
-                beatsChannelHandler,
+                messageAckEncoder,
+                new BatchDecoder(log),
+                batchChannelInboundHandler,
                 logExceptionChannelHandler
         ));
     }
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/ListenBeatsTest.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/ListenBeatsTest.java
new file mode 100644
index 0000000000..9b74a19e4f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/ListenBeatsTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats;
+
+import org.apache.nifi.processor.util.listen.ListenerProperties;
+import org.apache.nifi.processors.beats.protocol.FrameType;
+import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.zip.DeflaterOutputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ListenBeatsTest {
+
+    private static final String LOCALHOST = "127.0.0.1";
+
+    private static final String LOCALHOST_TRANSIT_URI = "beats://127.0.0.1:%d";
+
+    private static final int ACK_PACKET_LENGTH = 6;
+
+    private static final int FIRST_SEQUENCE_NUMBER = 1;
+
+    private static final int INTEGER_BUFFER_SIZE = 4;
+
+    private static final String JSON_PAYLOAD = 
"{\"@timestamp\":\"2022-10-31T12:30:45.678Z\",\"message\":\"Processing 
Started\"}";
+
+    private static final int WINDOWED_MESSAGES = 50;
+
+    TestRunner runner;
+
+    @BeforeEach
+    void setRunner() {
+        runner = TestRunners.newTestRunner(ListenBeats.class);
+    }
+
+    @Timeout(10)
+    @Test
+    void testRunSingleJsonMessage() throws Exception {
+        final int port = NetworkUtils.getAvailableTcpPort();
+        runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
+
+        startServer();
+
+        try (
+                final Socket socket = new Socket(LOCALHOST, port);
+                final InputStream inputStream = socket.getInputStream();
+                final OutputStream outputStream = socket.getOutputStream()
+        ) {
+            sendMessage(outputStream, FIRST_SEQUENCE_NUMBER);
+            assertAckPacketMatched(inputStream, FIRST_SEQUENCE_NUMBER);
+        }
+
+        assertFlowFilesSuccess(1);
+        assertReceiveEventFound(port);
+    }
+
+    @Timeout(10)
+    @Test
+    void testRunWindowSizeJsonMessages() throws Exception {
+        final int port = NetworkUtils.getAvailableTcpPort();
+        runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
+
+        startServer();
+
+        try (
+                final Socket socket = new Socket(LOCALHOST, port);
+                final InputStream inputStream = socket.getInputStream();
+                final OutputStream outputStream = socket.getOutputStream()
+        ) {
+            sendWindowSize(outputStream);
+
+            for (int sequenceNumber = FIRST_SEQUENCE_NUMBER; sequenceNumber <= 
WINDOWED_MESSAGES; sequenceNumber++) {
+                sendMessage(outputStream, sequenceNumber);
+            }
+
+            assertAckPacketMatched(inputStream, WINDOWED_MESSAGES);
+        }
+
+        assertFlowFilesSuccess(WINDOWED_MESSAGES);
+        assertReceiveEventFound(port);
+    }
+
+    @Timeout(10)
+    @Test
+    void testRunWindowSizeCompressedJsonMessages() throws Exception {
+        final int port = NetworkUtils.getAvailableTcpPort();
+        runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
+
+        startServer();
+
+        try (
+                final Socket socket = new Socket(LOCALHOST, port);
+                final InputStream inputStream = socket.getInputStream();
+                final OutputStream outputStream = socket.getOutputStream()
+        ) {
+            sendWindowSize(outputStream);
+
+            final ByteArrayOutputStream compressedOutputStream = new 
ByteArrayOutputStream();
+            final DeflaterOutputStream deflaterOutputStream = new 
DeflaterOutputStream(compressedOutputStream);
+
+            for (int sequenceNumber = FIRST_SEQUENCE_NUMBER; sequenceNumber <= 
WINDOWED_MESSAGES; sequenceNumber++) {
+                sendMessage(deflaterOutputStream, sequenceNumber);
+            }
+
+            deflaterOutputStream.close();
+            final byte[] compressed = compressedOutputStream.toByteArray();
+            sendCompressed(outputStream, compressed);
+
+            assertAckPacketMatched(inputStream, WINDOWED_MESSAGES);
+        }
+
+        assertFlowFilesSuccess(WINDOWED_MESSAGES);
+        assertReceiveEventFound(port);
+    }
+
+    private void startServer() {
+        runner.run(1, false, true);
+    }
+
+    private void assertReceiveEventFound(final int port) {
+        final Optional<ProvenanceEventRecord> receiveRecord = 
runner.getProvenanceEvents().stream().filter(record ->
+                ProvenanceEventType.RECEIVE == record.getEventType()
+        ).findFirst();
+
+        assertTrue(receiveRecord.isPresent());
+        final ProvenanceEventRecord record = receiveRecord.get();
+
+        final String expectedTransitUri = String.format(LOCALHOST_TRANSIT_URI, 
port);
+        assertEquals(expectedTransitUri, record.getTransitUri());
+    }
+
+    private void assertFlowFilesSuccess(final int expectedFlowFiles) {
+        runner.run(expectedFlowFiles, true, false);
+        runner.assertTransferCount(ListenBeats.REL_SUCCESS, expectedFlowFiles);
+
+        final Iterator<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListenBeats.REL_SUCCESS).iterator();
+        int i = 1;
+        while (flowFiles.hasNext()) {
+            final MockFlowFile flowFile = flowFiles.next();
+            final String content = flowFile.getContent();
+            assertEquals(JSON_PAYLOAD, content, String.format("FlowFile 
Content [%d] not matched", i));
+            i++;
+        }
+    }
+
+    private void sendWindowSize(final OutputStream outputStream) throws 
IOException {
+        outputStream.write(ProtocolVersion.VERSION_2.getCode());
+
+        outputStream.write(FrameType.WINDOW_SIZE.getCode());
+
+        final byte[] windowSize = getUnsignedInteger(WINDOWED_MESSAGES);
+        outputStream.write(windowSize);
+
+        outputStream.flush();
+    }
+
+    private void sendMessage(final OutputStream outputStream, final int 
sequenceNumber) throws IOException {
+        outputStream.write(ProtocolVersion.VERSION_2.getCode());
+
+        outputStream.write(FrameType.JSON.getCode());
+
+        final byte[] sequenceNumberEncoded = 
getUnsignedInteger(sequenceNumber);
+        outputStream.write(sequenceNumberEncoded);
+
+        final int payloadLength = JSON_PAYLOAD.length();
+
+        final byte[] payloadSize = getUnsignedInteger(payloadLength);
+        outputStream.write(payloadSize);
+
+        outputStream.write(JSON_PAYLOAD.getBytes(StandardCharsets.UTF_8));
+
+        outputStream.flush();
+    }
+
+    private void sendCompressed(final OutputStream outputStream, final byte[] 
compressed) throws IOException {
+        outputStream.write(ProtocolVersion.VERSION_2.getCode());
+
+        outputStream.write(FrameType.COMPRESSED.getCode());
+
+        final int payloadLength = compressed.length;
+
+        final byte[] payloadSize = getUnsignedInteger(payloadLength);
+        outputStream.write(payloadSize);
+
+        outputStream.write(compressed);
+
+        outputStream.flush();
+    }
+
+    private void assertAckPacketMatched(final InputStream inputStream, final 
int expectedSequenceNumber) throws IOException {
+        final byte[] ackPacket = new byte[ACK_PACKET_LENGTH];
+        final int bytesRead = inputStream.read(ackPacket);
+
+        assertEquals(ACK_PACKET_LENGTH, bytesRead);
+
+        final ByteBuffer ackPacketBuffer = ByteBuffer.wrap(ackPacket);
+
+        final byte version = ackPacketBuffer.get();
+        assertEquals(ProtocolVersion.VERSION_2.getCode(), version);
+
+        final byte frameType = ackPacketBuffer.get();
+        assertEquals(FrameType.ACK.getCode(), frameType);
+
+        final int sequenceNumber = ackPacketBuffer.getInt();
+        assertEquals(expectedSequenceNumber, sequenceNumber);
+    }
+
+    private byte[] getUnsignedInteger(final int number) {
+        return ByteBuffer.allocate(INTEGER_BUFFER_SIZE).putInt(number).array();
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java
deleted file mode 100644
index b1a8b0da57..0000000000
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import javax.xml.bind.DatatypeConverter;
-import java.nio.ByteBuffer;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-
-
-public class TestBeatsEncoder {
-    private BeatsEncoder encoder;
-
-
-    @BeforeEach
-    public void setup() {
-        this.encoder = new BeatsEncoder();
-    }
-
-    @Test
-    public void testEncode() {
-        BeatsFrame frame = new BeatsFrame.Builder()
-            .version((byte) 0x31)
-            .frameType((byte) 0x41)
-            .payload(ByteBuffer.allocate(4).putInt(123).array())
-            .build();
-
-        byte[] encoded = encoder.encode(frame);
-
-        assertArrayEquals(DatatypeConverter.parseHexBinary("31410000007B"), 
encoded);
-    }
-}
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java
 
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java
deleted file mode 100644
index 1d5b28ac51..0000000000
--- 
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-public class TestBeatsFrame {
-
-    @Test
-    public void testInvalidVersion() {
-        assertThrows(BeatsFrameException.class, () -> new 
BeatsFrame.Builder().seqNumber(1234).dataSize(3).build());
-    }
-
-    @Test
-    public void testInvalidFrameType() {
-        assertThrows(BeatsFrameException.class, () -> new 
BeatsFrame.Builder().frameType((byte) 0x70).dataSize(5).build());
-    }
-
-    @Test
-    public void testBlankFrameType() {
-        assertThrows(BeatsFrameException.class, () -> new 
BeatsFrame.Builder().frameType(((byte) 0x00)).dataSize(5).build());
-    }
-}
\ No newline at end of file

Reply via email to