This is an automated email from the ASF dual-hosted git repository.
thenatog pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7c1a7da116 NIFI-10737 Corrected ListenBeats buffer handling
7c1a7da116 is described below
commit 7c1a7da1169f66fce490def753f9a0a228a4f75b
Author: exceptionfactory <[email protected]>
AuthorDate: Mon Oct 31 22:30:32 2022 -0500
NIFI-10737 Corrected ListenBeats buffer handling
- Added test class for ListenBeats
- Removed unnecessary dependencies
- Implemented BatchDecoder for reading Beats Protocol frames
- Refactored protocol and handler classes
Signed-off-by: Nathan Gough <[email protected]>
This closes #6608.
---
.../nifi-beats-processors/pom.xml | 46 +--
.../apache/nifi/processors/beats/ListenBeats.java | 180 ++++------
.../nifi/processors/beats/frame/BeatsDecoder.java | 328 ------------------
.../nifi/processors/beats/frame/BeatsEncoder.java | 47 ---
.../nifi/processors/beats/frame/BeatsFrame.java | 115 -------
.../beats/handler/BatchChannelInboundHandler.java | 83 +++++
.../processors/beats/handler/BatchDecoder.java | 380 +++++++++++++++++++++
.../beats/handler/MessageAckEncoder.java | 65 ++++
.../processors/beats/netty/BeatsFrameDecoder.java | 81 -----
.../beats/netty/BeatsMessageChannelHandler.java | 57 ----
.../Batch.java} | 21 +-
.../BatchMessage.java} | 18 +-
.../FrameType.java} | 29 +-
.../FrameTypeDecoder.java} | 25 +-
.../BeatsState.java => protocol/MessageAck.java} | 20 +-
.../ProtocolCode.java} | 15 +-
.../ProtocolCodeDecoder.java} | 21 +-
.../ProtocolException.java} | 25 +-
.../ProtocolVersion.java} | 23 +-
.../ProtocolVersionDecoder.java} | 25 +-
.../beats/response/BeatsChannelResponse.java | 42 ---
.../processors/beats/response/BeatsResponse.java | 62 ----
.../BeatsMessageServerFactory.java | 25 +-
.../nifi/processors/beats/ListenBeatsTest.java | 244 +++++++++++++
.../processors/beats/frame/TestBeatsEncoder.java | 49 ---
.../processors/beats/frame/TestBeatsFrame.java | 39 ---
26 files changed, 995 insertions(+), 1070 deletions(-)
diff --git a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml
index 91a030f4e1..1aeccb4b81 100644
--- a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/pom.xml
@@ -34,31 +34,18 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-event-listen</artifactId>
<version>1.19.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-security-socket-ssl</artifactId>
- <version>1.19.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-socket-utils</artifactId>
- <version>1.19.0-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-security-socket-ssl</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>1.19.0-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-flowfile-packager</artifactId>
- <version>1.19.0-SNAPSHOT</version>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
@@ -71,25 +58,4 @@
<scope>test</scope>
</dependency>
</dependencies>
-
- <profiles>
- <profile>
- <!-- This profile, activating when compiling on Java versions
above 1.8, provides configuration changes to
- allow NiFi to be compiled on those JDKs. -->
- <id>jigsaw</id>
- <activation>
- <jdk>(1.8,)</jdk>
- </activation>
- <dependencies>
- <dependency>
- <groupId>jakarta.xml.bind</groupId>
- <artifactId>jakarta.xml.bind-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish.jaxb</groupId>
- <artifactId>jaxb-runtime</artifactId>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
</project>
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
index 86cc2df339..4248c31dcf 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
@@ -16,20 +16,18 @@
*/
package org.apache.nifi.processors.beats;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
+import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -38,14 +36,13 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
-import org.apache.nifi.processors.beats.netty.BeatsMessage;
-import org.apache.nifi.processors.beats.netty.BeatsMessageServerFactory;
+import org.apache.nifi.processors.beats.protocol.BatchMessage;
+import org.apache.nifi.processors.beats.server.BeatsMessageServerFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
@@ -55,11 +52,9 @@ import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -67,34 +62,30 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@Tags({"listen", "beats", "tcp", "logs"})
-@CapabilityDescription("Listens for messages sent by libbeat compatible
clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash',
writing its JSON formatted payload " +
- "to the content of a FlowFile." +
- "This processor replaces the now deprecated/removed ListenLumberjack")
+@Tags({"beats", "logstash", "elasticsearch", "log"})
+@CapabilityDescription("Receive messages encoded using the Elasticsearch Beats
protocol and write decoded JSON")
@WritesAttributes({
- @WritesAttribute(attribute = "beats.sender", description = "The sending
host of the messages."),
- @WritesAttribute(attribute = "beats.port", description = "The sending port
the messages were received over."),
- @WritesAttribute(attribute = "beats.sequencenumber", description = "The
sequence number of the message. Only included if <Batch Size> is 1."),
+ @WritesAttribute(attribute = "beats.sender", description = "Internet
Protocol address of the message sender"),
+ @WritesAttribute(attribute = "beats.port", description = "TCP port on
which the Processor received messages"),
+ @WritesAttribute(attribute = "beats.sequencenumber", description = "The
sequence number of the message included for batches containing single
messages"),
@WritesAttribute(attribute = "mime.type", description = "The mime.type of
the content which is application/json")
})
-@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
public class ListenBeats extends AbstractProcessor {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
.name("SSL_CONTEXT_SERVICE")
.displayName("SSL Context Service")
- .description("The Controller Service to use in order to obtain an SSL
Context. If this property is set, " +
- "messages will be received over a secure connection.")
- // Nearly all Lumberjack v1 implementations require TLS to work. v2
implementations (i.e. beats) have TLS as optional
+ .description("SSL Context Service is required to enable TLS for socket
connections")
.required(false)
.identifiesControllerService(RestrictedSSLContextService.class)
.build();
public static final PropertyDescriptor CLIENT_AUTH = new
PropertyDescriptor.Builder()
.name("Client Auth")
- .displayName("Client Auth")
- .description("The client authentication policy to use for the SSL
Context. Only used if an SSL Context Service is provided.")
+ .displayName("Client Authentication")
+ .description("Client authentication policy when TLS is enabled")
.required(false)
+ .dependsOn(SSL_CONTEXT_SERVICE)
.allowableValues(ClientAuth.values())
.defaultValue(ClientAuth.REQUIRED.name())
.build();
@@ -104,73 +95,43 @@ public class ListenBeats extends AbstractProcessor {
.description("Messages received successfully will be sent out this
relationship.")
.build();
- protected List<PropertyDescriptor> descriptors;
- protected Set<Relationship> relationships;
+ private static final List<PropertyDescriptor> DESCRIPTORS =
Collections.unmodifiableList(Arrays.asList(
+ ListenerProperties.NETWORK_INTF_NAME,
+ ListenerProperties.PORT,
+ ListenerProperties.RECV_BUFFER_SIZE,
+ ListenerProperties.MAX_MESSAGE_QUEUE_SIZE,
+ ListenerProperties.MAX_SOCKET_BUFFER_SIZE,
+ ListenerProperties.CHARSET,
+ ListenerProperties.MAX_BATCH_SIZE,
+ ListenerProperties.MESSAGE_DELIMITER,
+ ListenerProperties.WORKER_THREADS,
+ SSL_CONTEXT_SERVICE,
+ CLIENT_AUTH
+ ));
+
+ private static final Set<Relationship> RELATIONSHIPS =
Collections.singleton(REL_SUCCESS);
+
protected volatile int port;
- protected volatile BlockingQueue<BeatsMessage> events;
- protected volatile BlockingQueue<BeatsMessage> errorEvents;
+ protected volatile BlockingQueue<BatchMessage> events;
+ protected volatile BlockingQueue<BatchMessage> errorEvents;
protected volatile EventServer eventServer;
protected volatile byte[] messageDemarcatorBytes;
- protected volatile EventBatcher<BeatsMessage> eventBatcher;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> descriptors = new ArrayList<>();
- descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
- descriptors.add(ListenerProperties.PORT);
- descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
- descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
- // Deprecated
- descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
- descriptors.add(ListenerProperties.CHARSET);
- descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
- descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
- descriptors.add(ListenerProperties.WORKER_THREADS);
- descriptors.add(SSL_CONTEXT_SERVICE);
- descriptors.add(CLIENT_AUTH);
- this.descriptors = Collections.unmodifiableList(descriptors);
-
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
-
- @Override
- protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
- final List<ValidationResult> results = new ArrayList<>();
-
- final SSLContextService sslContextService =
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-
- if (sslContextService != null &&
!sslContextService.isTrustStoreConfigured()) {
- results.add(new ValidationResult.Builder()
- .explanation("SSL Context Service requires a truststore for
the Beats forwarder client to work correctly")
- .valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
- }
-
- final String clientAuth =
validationContext.getProperty(CLIENT_AUTH).getValue();
- if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
- results.add(new ValidationResult.Builder()
- .explanation("Client Auth must be provided when using
TLS/SSL")
- .valid(false).subject("Client Auth").build());
- }
-
- return results;
- }
+ protected volatile EventBatcher<BatchMessage> eventBatcher;
@Override
public final Set<Relationship> getRelationships() {
- return this.relationships;
+ return RELATIONSHIPS;
}
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return descriptors;
+ return DESCRIPTORS;
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
final int workerThreads =
context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
- final int bufferSize =
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ final int socketBufferSize =
context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
final String networkInterface =
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
final InetAddress address =
NetworkUtils.getInterfaceAddress(networkInterface);
final Charset charset =
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
@@ -180,7 +141,7 @@ public class ListenBeats extends AbstractProcessor {
final String msgDemarcator = getMessageDemarcator(context);
messageDemarcatorBytes = msgDemarcator.getBytes(charset);
- final NettyEventServerFactory eventFactory = new
BeatsMessageServerFactory(getLogger(), address, port, charset, events);
+ final NettyEventServerFactory eventFactory = new
BeatsMessageServerFactory(getLogger(), address, port, events);
final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
@@ -191,80 +152,75 @@ public class ListenBeats extends AbstractProcessor {
eventFactory.setClientAuth(clientAuth);
}
- eventFactory.setSocketReceiveBuffer(bufferSize);
+ eventFactory.setSocketReceiveBuffer(socketBufferSize);
eventFactory.setWorkerThreads(workerThreads);
eventFactory.setThreadNamePrefix(String.format("%s[%s]",
getClass().getSimpleName(), getIdentifier()));
+
eventFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+ eventFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
try {
eventServer = eventFactory.getEventServer();
- } catch (EventException e) {
+ } catch (final EventException e) {
getLogger().error("Failed to bind to [{}:{}]", address, port, e);
}
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
- EventBatcher<BeatsMessage> eventBatcher = getEventBatcher();
+ EventBatcher<BatchMessage> eventBatcher = getEventBatcher();
final int batchSize =
context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
- Map<String, FlowFileEventBatch<BeatsMessage>> batches =
eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+ Map<String, FlowFileEventBatch<BatchMessage>> batches =
eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
processEvents(session, batches);
}
@OnStopped
- public void stopped() {
- if (eventServer != null) {
+ public void shutdown() {
+ if (eventServer == null) {
+ getLogger().warn("Event Server not configured");
+ } else {
eventServer.shutdown();
}
eventBatcher = null;
}
- private void processEvents(final ProcessSession session, final Map<String,
FlowFileEventBatch<BeatsMessage>> batches) {
- for (Map.Entry<String, FlowFileEventBatch<BeatsMessage>> entry :
batches.entrySet()) {
+ private void processEvents(final ProcessSession session, final Map<String,
FlowFileEventBatch<BatchMessage>> batches) {
+ for (final Map.Entry<String, FlowFileEventBatch<BatchMessage>> entry :
batches.entrySet()) {
FlowFile flowFile = entry.getValue().getFlowFile();
- final List<BeatsMessage> events = entry.getValue().getEvents();
+ final List<BatchMessage> events = entry.getValue().getEvents();
if (flowFile.getSize() == 0L || events.size() == 0) {
session.remove(flowFile);
- getLogger().debug("No data written to FlowFile from batch {};
removing FlowFile", entry.getKey());
continue;
}
final Map<String,String> attributes =
getAttributes(entry.getValue());
flowFile = session.putAllAttributes(flowFile, attributes);
- getLogger().debug("Transferring {} to success", flowFile);
session.transfer(flowFile, REL_SUCCESS);
- session.adjustCounter("FlowFiles Transferred to Success", 1L,
false);
- // the sender and command will be the same for all events based on
the batch key
final String transitUri = getTransitUri(entry.getValue());
session.getProvenanceReporter().receive(flowFile, transitUri);
-
}
- session.commitAsync();
}
- protected String getTransitUri(FlowFileEventBatch<BeatsMessage> batch) {
- final List<BeatsMessage> events = batch.getEvents();
+ private String getTransitUri(final FlowFileEventBatch<BatchMessage> batch)
{
+ final List<BatchMessage> events = batch.getEvents();
final String sender = events.get(0).getSender();
- final String senderHost = sender.startsWith("/") && sender.length() >
1 ? sender.substring(1) : sender;
- return String.format("beats://%s:%d", senderHost, port);
+ return String.format("beats://%s:%d", sender, port);
}
- protected Map<String, String>
getAttributes(FlowFileEventBatch<BeatsMessage> batch) {
- final List<BeatsMessage> events = batch.getEvents();
- // the sender and command will be the same for all events based on the
batch key
+ private Map<String, String> getAttributes(final
FlowFileEventBatch<BatchMessage> batch) {
+ final List<BatchMessage> events = batch.getEvents();
+
final String sender = events.get(0).getSender();
- final int numAttributes = events.size() == 1 ? 5 : 4;
- final Map<String, String> attributes = new HashMap<>(numAttributes);
- attributes.put(beatsAttributes.SENDER.key(), sender);
- attributes.put(beatsAttributes.PORT.key(), String.valueOf(port));
+ final Map<String, String> attributes = new LinkedHashMap<>();
+ attributes.put(BeatsAttributes.SENDER.key(), sender);
+ attributes.put(BeatsAttributes.PORT.key(), String.valueOf(port));
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
- // if there was only one event then we can pass on the transaction
- // NOTE: we could pass on all the transaction ids joined together
+
if (events.size() == 1) {
- attributes.put(beatsAttributes.SEQNUMBER.key(),
String.valueOf(events.get(0).getSeqNumber()));
+ attributes.put(BeatsAttributes.SEQUENCE_NUMBER.key(),
String.valueOf(events.get(0).getSequenceNumber()));
}
return attributes;
}
@@ -275,11 +231,11 @@ public class ListenBeats extends AbstractProcessor {
.replace("\\n", "\n").replace("\\r", "\r").replace("\\t",
"\t");
}
- private EventBatcher<BeatsMessage> getEventBatcher() {
+ private EventBatcher<BatchMessage> getEventBatcher() {
if (eventBatcher == null) {
- eventBatcher = new EventBatcher<BeatsMessage>(getLogger(), events,
errorEvents) {
+ eventBatcher = new EventBatcher<BatchMessage>(getLogger(), events,
errorEvents) {
@Override
- protected String getBatchKey(BeatsMessage event) {
+ protected String getBatchKey(final BatchMessage event) {
return event.getSender();
}
};
@@ -287,14 +243,14 @@ public class ListenBeats extends AbstractProcessor {
return eventBatcher;
}
- public enum beatsAttributes implements FlowFileAttributeKey {
+ private enum BeatsAttributes implements FlowFileAttributeKey {
SENDER("beats.sender"),
PORT("beats.port"),
- SEQNUMBER("beats.sequencenumber");
+ SEQUENCE_NUMBER("beats.sequencenumber");
private final String key;
- beatsAttributes(String key) {
+ BeatsAttributes(String key) {
this.key = key;
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
deleted file mode 100644
index 2fa20ca856..0000000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-import org.apache.nifi.logging.ComponentLog;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.zip.InflaterInputStream;
-
-/**
- * Decodes a Beats frame by maintaining a state based on each byte that has
been processed. This class
- * should not be shared by multiple threads.
- */
-public class BeatsDecoder {
-
-
- final ComponentLog logger;
-
- private BeatsFrame.Builder frameBuilder;
- private BeatsState currState = BeatsState.VERSION;
- private byte decodedFrameType;
-
- private byte[] unprocessedData;
-
- private final Charset charset;
- private final ByteArrayOutputStream currBytes;
-
- private long windowSize;
-
- static final int MIN_FRAME_HEADER_LENGTH = 2; // Version + Type
- static final int WINDOWSIZE_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; // 32bit
unsigned window size
- static final int COMPRESSED_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; //
32 bit unsigned + payload
- static final int JSON_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 8; // 32 bit
unsigned sequence number + 32 bit unsigned payload length
-
- /**
- * @param charset the charset to decode bytes from the frame
- */
- public BeatsDecoder(final Charset charset, final ComponentLog logger) {
- this(charset, new ByteArrayOutputStream(4096), logger);
- }
-
- /**
- * @param charset the charset to decode bytes from the frame
- * @param buffer a buffer to use while processing the bytes
- */
- public BeatsDecoder(final Charset charset, final ByteArrayOutputStream
buffer, final ComponentLog logger) {
- this.logger = logger;
- this.charset = charset;
- this.currBytes = buffer;
- this.frameBuilder = new BeatsFrame.Builder();
- this.decodedFrameType = 0x00;
- }
-
- /**
- * Resets this decoder back to its initial state.
- */
- public void reset() {
- frameBuilder = new BeatsFrame.Builder();
- currState = BeatsState.VERSION;
- decodedFrameType = 0x00;
- currBytes.reset();
- }
-
- /**
- * Process the next byte from the channel, updating the builder and state
accordingly.
- *
- * @param currByte the next byte to process
- * @preturn true if a frame is ready to be retrieved, false otherwise
- */
- public boolean process(final byte currByte) throws BeatsFrameException {
- try {
- switch (currState) {
- case VERSION: // Just enough data to process the version
- processVERSION(currByte);
- break;
- case FRAMETYPE: // Also able to process the frametype
- processFRAMETYPE(currByte);
- break;
- case PAYLOAD: // Initial bytes with version and Frame Type
have already been received, start iteration over payload
- processPAYLOAD(currByte);
-
- // At one stage, the data sent to processPAYLOAD will be
represente a complete frame, so we check before returning true
-
- if (frameBuilder.frameType == BeatsFrameType.WINDOWSIZE &&
currState == BeatsState.COMPLETE) {
- return true;
- } else if (frameBuilder.frameType ==
BeatsFrameType.COMPRESSED && currState == BeatsState.COMPLETE) {
- return true;
- } else if (frameBuilder.frameType == BeatsFrameType.JSON
&& currState == BeatsState.COMPLETE) {
- return true;
- } else {
- break;
- }
- case COMPLETE:
- return true;
- default:
- break;
- }
- return false;
- } catch (Exception e) {
- throw new BeatsFrameException("Error decoding Beats frame: " +
e.getMessage(), e);
- }
- }
-
-
- /**
- * Returns the decoded frame and resets the decoder for the next frame.
- * This method should be called after checking isComplete().
- *
- * @return the BeatsFrame that was decoded
- */
- public List<BeatsFrame> getFrames() throws BeatsFrameException {
- List<BeatsFrame> frames = new LinkedList<>();
-
- if (currState != BeatsState.COMPLETE) {
- throw new BeatsFrameException("Must be at the trailer of a frame");
- }
- try {
- // Once compressed frames are expanded, they must be devided into
individual frames
- if (currState == BeatsState.COMPLETE && frameBuilder.frameType ==
BeatsFrameType.COMPRESSED) {
- logger.debug("Frame is compressed, will iterate to decode",
new Object[]{});
-
- // Zero currBytes, currState and frameBuilder prior to
iteration over
- // decompressed bytes
- currBytes.reset();
- frameBuilder.reset();
- currState = BeatsState.VERSION;
-
- // Run over decompressed data and split frames
- frames = splitCompressedFrames(unprocessedData);
-
- // In case of V or wired D and J frames we just ship them across
the List
- } else {
- final BeatsFrame frame = frameBuilder.build();
- currBytes.reset();
- frameBuilder.reset();
- currState = BeatsState.VERSION;
- frames.add(frame);
- }
- return frames;
-
- } catch (Exception e) {
- throw new BeatsFrameException("Error decoding Beats frame: " +
e.getMessage(), e);
- }
- }
-
- private List<BeatsFrame> splitCompressedFrames(byte[] decompressedData) {
- List<BeatsFrame> frames = new LinkedList<>();
- BeatsFrame.Builder internalFrameBuilder = new BeatsFrame.Builder();
- ByteBuffer currentData = ByteBuffer.wrap(decompressedData);
-
- // Both Lumberjack v1 and Beats (LJ v2) has a weird approach to
frames, where compressed frames embed D(ata) or J(SON) frames.
- // inside a compressed input.
- // Or as stated in the documentation:
- //
- // "As an example, you could have 3 data frames compressed into a
single
- // 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}"
- //
- // Therefore, instead of calling process method again, just iterate
over each of
- // the frames and split them so they can be processed
-
- while (currentData.hasRemaining()) {
-
- int payloadLength = 0;
-
- internalFrameBuilder.version = currentData.get();
- internalFrameBuilder.frameType = currentData.get();
- switch (internalFrameBuilder.frameType) {
- case BeatsFrameType.JSON:
-
- internalFrameBuilder.seqNumber = (int)
(currentData.getInt() & 0x00000000ffffffffL);
- currentData.mark();
-
- internalFrameBuilder.dataSize = currentData.getInt() &
0x00000000ffffffffL;
- currentData.mark();
-
- // Define how much data to chomp
- payloadLength =
Math.toIntExact(internalFrameBuilder.dataSize);
- byte[] jsonBytes = new byte[payloadLength];
-
- currentData.get(jsonBytes, 0, payloadLength);
- currentData.mark();
-
- // Add payload to frame
- internalFrameBuilder.payload(jsonBytes);
- break;
- }
-
- // data frame is created
- BeatsFrame frame = internalFrameBuilder.build();
- frames.add(frame);
- internalFrameBuilder.reset();
- }
-
- return frames;
- }
-
-
- private void processVERSION(final byte b) {
- byte version = b;
- frameBuilder.version(version);
- logger.debug("Version number is {}", new Object[]{version});
- currBytes.write(b);
- currState = BeatsState.FRAMETYPE;
- }
-
- private void processFRAMETYPE(final byte b) {
- decodedFrameType = b;
- frameBuilder.frameType(decodedFrameType);
- logger.debug("Frame type is {}", new Object[]{decodedFrameType});
- currBytes.write(b);
- currState = BeatsState.PAYLOAD;
- }
-
-
- /** Process the outer PAYLOAD byte by byte. Once data is read state is set
to COMPLETE so that the data payload
- * can be processed fully using {@link #splitCompressedFrames(byte[])}
- * */
- private void processPAYLOAD(final byte b) {
- currBytes.write(b);
- switch (decodedFrameType) {
- case BeatsFrameType.WINDOWSIZE: //'W'
- if (currBytes.size() < WINDOWSIZE_LENGTH ) {
- logger.trace("Beats currBytes contents are {}", new
Object[] {currBytes.toString()});
- break;
- } else if (currBytes.size() == WINDOWSIZE_LENGTH) {
- frameBuilder.dataSize =
ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2,
6)).getInt() & 0x00000000ffffffffL;
- logger.debug("Data size is {}", new
Object[]{frameBuilder.dataSize});
- // Sets payload to empty as frame contains no data
- frameBuilder.payload(new byte[]{});
- currBytes.reset();
- currState = BeatsState.COMPLETE;
- windowSize = frameBuilder.dataSize;
- break;
- } else { // Should never be here to be honest...
- logger.debug("Saw a packet I should not have seen. Packet
contents were {}", new Object[] {currBytes.toString()});
- break;
- }
- case BeatsFrameType.COMPRESSED: //'C'
- if (currBytes.size() < COMPRESSED_MIN_LENGTH) {
- if (logger.isTraceEnabled()) {
- logger.trace("Beats currBytes contents are {}", new
Object[] {currBytes.toString()});
- }
- break;
- } else if (currBytes.size() >= COMPRESSED_MIN_LENGTH) {
- // If data contains more thant the minimum data size
- frameBuilder.dataSize =
ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2,
6)).getInt() & 0x00000000ffffffffL;
- if (currBytes.size() - 6 == frameBuilder.dataSize) {
- try {
- byte[] buf =
java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6, currBytes.size());
- InputStream in = new InflaterInputStream(new
ByteArrayInputStream(buf));
- ByteArrayOutputStream out = new
ByteArrayOutputStream();
- byte[] buffer = new byte[1024];
- int len;
- while ((len = in.read(buffer)) > 0) {
- out.write(buffer, 0, len);
- }
- in.close();
- out.close();
- unprocessedData = out.toByteArray();
- // buf is no longer needed
- buf = null;
- logger.debug("Finished decompressing data");
- // Decompression is complete, we should be able to
proceed with resetting currBytes and curSrtate and iterating them
- // as type 'D' frames
- frameBuilder.dataSize(unprocessedData.length);
- currState = BeatsState.COMPLETE;
-
- } catch (IOException e) {
- throw new BeatsFrameException("Error decompressing
frame: " + e.getMessage(), e);
- }
-
- }
- break;
- // If currentByte.size is not lower than six and also not
equal or great than 6...
- } else { // Should never be here to be honest...
- if (logger.isDebugEnabled()) {
- logger.debug("Received a compressed frame with partial
data or invalid content. The packet contents were {}", new Object[]
{currBytes.toString()});
- }
- break;
- }
- case BeatsFrameType.JSON: // 'J́'
- // Because Beats can disable compression, sometimes, JSON data
will be received outside a compressed
- // stream (i.e. 0x43). Instead of processing it here, we defer
its processing to went getFrames is
- // called
- if (currBytes.size() < JSON_MIN_LENGTH) {
- if (logger.isTraceEnabled()) {
- logger.trace("Beats currBytes contents are {}", new
Object[] {currBytes.toString()});
- }
- break;
- } else if (currBytes.size() == JSON_MIN_LENGTH) {
- // Read the sequence number from bytes
- frameBuilder.seqNumber = (int)
(ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 2,
6)).getInt() & 0x00000000ffffffffL);
- // Read the JSON payload length
- frameBuilder.dataSize =
ByteBuffer.wrap(java.util.Arrays.copyOfRange(currBytes.toByteArray(), 6,
10)).getInt() & 0x00000000ffffffffL;
- } else if (currBytes.size() > JSON_MIN_LENGTH) {
- // Wait for payload to be fully read and then complete
processing
- if (currBytes.size() - 10 == frameBuilder.dataSize) {
- // Transfer the current payload so it can be processed
by {@link #splitCompressedFrames} method.
- frameBuilder.payload =
java.util.Arrays.copyOfRange(currBytes.toByteArray(), 10, currBytes.size());
- currState = BeatsState.COMPLETE;
- }
- break;
- }
- }
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java
deleted file mode 100644
index 8463d48ed2..0000000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsEncoder.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-
-/**
- * Encodes a BeatsFrame into raw bytes using the given charset.
- */
-public class BeatsEncoder {
-
-
- public byte[] encode(final BeatsFrame frame) {
- final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
-
- // Writes the version
- buffer.write(frame.getVersion());
-
- // Writes the frameType
- buffer.write(frame.getFrameType());
-
- // Writes the sequence number
- try {
- buffer.write(frame.getPayload());
- } catch (IOException e) {
- throw new BeatsFrameException("Error decoding Beats frame: " +
e.getMessage(), e);
- }
-
- return buffer.toByteArray();
- }
-
-}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java
deleted file mode 100644
index ccb3bba366..0000000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrame.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-
-/**
- * A frame received from a channel.
- */
-public class BeatsFrame {
-
- public static final byte DELIMITER = 10;
-
- private final byte version;
- private final byte frameType;
- private final byte[] payload;
- private final long dataSize;
- private final long seqNumber;
-
- private BeatsFrame(final Builder builder) {
- this.version = builder.version;
- this.frameType = builder.frameType;
- this.payload = builder.payload;
- this.dataSize = builder.dataSize;
- this.seqNumber = builder.seqNumber;
-
- if (version < 2 || payload.length < 0 ) {
- throw new BeatsFrameException("Invalid Frame");
- }
- }
-
- public long getSeqNumber() {
- return seqNumber;
- }
-
- public byte getVersion() {
- return version;
- }
-
- public byte getFrameType() {
- return frameType;
- }
-
- public byte [] getPayload() {
- return payload;
- }
-
- /**
- * Builder for a BeatsFrame.
- */
- public static class Builder {
-
- byte version;
- byte frameType;
- byte [] payload;
- long dataSize;
- int seqNumber;
-
- public Builder() {
- reset();
- }
-
- public void reset() {
- version = -1;
- seqNumber = -1;
- frameType = -1;
- payload = null;
- }
-
- public Builder version(final byte version) {
- this.version = version;
- return this;
- }
-
- public Builder seqNumber(final int seqNumber) {
- this.seqNumber = seqNumber;
- return this;
- }
-
- public Builder frameType(final byte frameType) {
- this.frameType = frameType;
- return this;
- }
-
- public Builder dataSize(final long dataSize) {
- this.dataSize = dataSize;
- return this;
- }
-
- public Builder payload(final byte [] payload) {
- this.payload = payload;
- return this;
- }
-
-
- public BeatsFrame build() {
- return new BeatsFrame(this);
- }
-
- }
-
-}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java
new file mode 100644
index 0000000000..c620456431
--- /dev/null
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.beats.protocol.Batch;
+import org.apache.nifi.processors.beats.protocol.BatchMessage;
+import org.apache.nifi.processors.beats.protocol.MessageAck;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Batch Channel Inbound Handler processes a batch of messages and sends an
acknowledgement for the last sequence number
+ */
[email protected]
+public class BatchChannelInboundHandler extends
SimpleChannelInboundHandler<Batch> {
+ private final ComponentLog log;
+
+ private final BlockingQueue<BatchMessage> messages;
+
+ /**
+ * Batch Channel Inbound Handler with required arguments
+ *
+ * @param log Processor Log
+ * @param messages Queue of messages
+ */
+ public BatchChannelInboundHandler(final ComponentLog log, final
BlockingQueue<BatchMessage> messages) {
+ this.log = Objects.requireNonNull(log, "Component Log required");
+ this.messages = Objects.requireNonNull(messages, "Message Queue
required");
+ }
+
+ /**
+ * Channel Read processes a batch of messages and sends an acknowledgement
for the last sequence number
+ *
+ * @param context Channel Handler Context
+ * @param batch Batch of messages
+ */
+ @Override
+ protected void channelRead0(final ChannelHandlerContext context, final
Batch batch) {
+ Integer lastSequenceNumber = null;
+
+ final Collection<BatchMessage> batchMessages = batch.getMessages();
+ int queued = 0;
+ for (final BatchMessage batchMessage : batchMessages) {
+ final int sequenceNumber = batchMessage.getSequenceNumber();
+ final String sender = batchMessage.getSender();
+ if (messages.offer(batchMessage)) {
+ log.debug("Message Sequence Number [{}] Sender [{}] queued",
sequenceNumber, sender);
+ lastSequenceNumber = batchMessage.getSequenceNumber();
+ queued++;
+ } else {
+ log.warn("Message Sequence Number [{}] Sender [{}] queuing
failed: Queued [{}] of [{}]", sequenceNumber, sender, queued,
batchMessages.size());
+ break;
+ }
+ }
+
+ if (lastSequenceNumber == null) {
+ log.warn("Batch Messages [{}] queuing failed",
batch.getMessages().size());
+ } else {
+ final MessageAck messageAck = new MessageAck(lastSequenceNumber);
+ context.writeAndFlush(messageAck);
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java
new file mode 100644
index 0000000000..4aec512c22
--- /dev/null
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BatchDecoder.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.beats.protocol.Batch;
+import org.apache.nifi.processors.beats.protocol.BatchMessage;
+import org.apache.nifi.processors.beats.protocol.FrameType;
+import org.apache.nifi.processors.beats.protocol.FrameTypeDecoder;
+import org.apache.nifi.processors.beats.protocol.ProtocolCodeDecoder;
+import org.apache.nifi.processors.beats.protocol.ProtocolException;
+import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
+import org.apache.nifi.processors.beats.protocol.ProtocolVersionDecoder;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterOutputStream;
+
+/**
+ * Byte Buffer to Batch Decoder parses bytes to batches of Beats messages
+ */
+public class BatchDecoder extends ByteToMessageDecoder {
+ private static final int INITIAL_WINDOW_SIZE = 1;
+
+ private static final int INITIAL_QUEUE_SIZE = 1;
+
+ private static final int CODE_READABLE_BYTES = 1;
+
+ private static final int INT_READABLE_BYTES = 4;
+
+ private static final ProtocolCodeDecoder<ProtocolVersion> VERSION_DECODER
= new ProtocolVersionDecoder();
+
+ private static final ProtocolCodeDecoder<FrameType> FRAME_TYPE_DECODER =
new FrameTypeDecoder();
+
+ private final ComponentLog log;
+
+ private final AtomicReference<ProtocolVersion> versionRef = new
AtomicReference<>();
+
+ private final AtomicReference<FrameType> frameTypeRef = new
AtomicReference<>();
+
+ private final AtomicInteger windowSize = new
AtomicInteger(INITIAL_WINDOW_SIZE);
+
+ private final AtomicReference<Integer> sequenceNumberRef = new
AtomicReference<>();
+
+ private final AtomicReference<Integer> payloadSizeRef = new
AtomicReference<>();
+
+ private final AtomicReference<Integer> compressedSizeRef = new
AtomicReference<>();
+
+ private Queue<BatchMessage> batchMessages = new
ArrayBlockingQueue<>(INITIAL_QUEUE_SIZE);
+
+ /**
+ * Beats Batch Decoder with required arguments
+ *
+ * @param log Processor Log
+ */
+ public BatchDecoder(final ComponentLog log) {
+ this.log = Objects.requireNonNull(log, "Component Log required");
+ }
+
+ /**
+ * Decode Batch of Beats Messages from Byte Buffer
+ *
+ * @param context Channel Handler Context
+ * @param buffer Byte Buffer
+ * @param objects List of Batch objects
+ */
+ @Override
+ protected void decode(final ChannelHandlerContext context, final ByteBuf
buffer, final List<Object> objects) {
+ final ProtocolVersion protocolVersion = readVersion(buffer);
+ if (ProtocolVersion.VERSION_2 == protocolVersion) {
+ final FrameType frameType = readFrameType(buffer);
+ decodeFrameType(frameType, context, buffer, objects);
+ } else if (ProtocolVersion.VERSION_1 == protocolVersion) {
+ throw new ProtocolException("Protocol Version [1] not supported");
+ }
+ }
+
+ private void decodeFrameType(final FrameType frameType, final
ChannelHandlerContext context, final ByteBuf buffer, final List<Object>
batches) {
+ if (frameType == null) {
+ log.trace("Frame Type not found");
+ } else if (FrameType.COMPRESSED == frameType) {
+ processCompressed(context, buffer, batches);
+ } else if (FrameType.WINDOW_SIZE == frameType) {
+ processWindowSize(context, buffer);
+ } else if (FrameType.JSON == frameType) {
+ processJson(context, buffer, batches);
+ } else {
+ final String message = String.format("Frame Type [%s] not
supported", frameType);
+ throw new ProtocolException(message);
+ }
+ }
+
+ private void processWindowSize(final ChannelHandlerContext context, final
ByteBuf buffer) {
+ final Integer readWindowSize = readUnsignedInteger(buffer);
+ if (readWindowSize == null) {
+ log.trace("State [Read Window Size] not enough readable bytes");
+ } else {
+ windowSize.getAndSet(readWindowSize);
+ batchMessages = new ArrayBlockingQueue<>(readWindowSize);
+
+ resetFrameTypeVersion();
+ final Channel channel = context.channel();
+ log.debug("Processed Window Size [{}] Local [{}] Remote [{}]",
readWindowSize, channel.localAddress(), channel.remoteAddress());
+ }
+ }
+
+ private void processCompressed(final ChannelHandlerContext context, final
ByteBuf buffer, final List<Object> batches) {
+ final Integer readCompressedSize = readCompressedSize(buffer);
+ if (readCompressedSize == null) {
+ log.trace("State [Read Compressed] not enough readable bytes");
+ } else {
+ final int readableBytes = buffer.readableBytes();
+ if (readableBytes >= readCompressedSize) {
+ final Channel channel = context.channel();
+ log.debug("Processing Compressed Size [{}] Local [{}] Remote
[{}]", readCompressedSize, channel.localAddress(), channel.remoteAddress());
+
+ processCompressed(context, buffer, readCompressedSize,
batches);
+ } else {
+ log.trace("State [Read Compressed] not enough readable bytes
[{}] for compressed [{}]", readableBytes, readCompressedSize);
+ }
+ }
+ }
+
+ private void processCompressed(
+ final ChannelHandlerContext context,
+ final ByteBuf buffer,
+ final int compressedSize,
+ final List<Object> batches
+ ) {
+ final ByteBuf inflated = context.alloc().buffer(compressedSize);
+ try {
+ readCompressedBuffer(buffer, inflated, compressedSize);
+
+ // Clear status prior to decoding inflated frames
+ resetSequenceVersionPayloadSize();
+ resetFrameTypeVersion();
+
+ while (inflated.isReadable()) {
+ decode(context, inflated, batches);
+ }
+ } finally {
+ compressedSizeRef.set(null);
+ inflated.release();
+ }
+ }
+
+ private void processJson(final ChannelHandlerContext context, final
ByteBuf buffer, final List<Object> batches) {
+ final Channel channel = context.channel();
+
+ final Integer sequenceNumber = readSequenceNumber(buffer);
+ if (sequenceNumber == null) {
+ log.trace("State [Read JSON] Sequence Number not found Remote
[{}]", channel.remoteAddress());
+ } else {
+ final Integer payloadSize = readPayloadSize(buffer);
+ if (payloadSize == null) {
+ log.trace("State [Read JSON] Payload Size not found Remote
[{}]", channel.remoteAddress());
+ } else {
+ processJson(sequenceNumber, payloadSize, context, buffer,
batches);
+ }
+ }
+ }
+
+ private void processJson(
+ final int sequenceNumber,
+ final int payloadSize,
+ final ChannelHandlerContext context,
+ final ByteBuf buffer,
+ final List<Object> batches
+ ) {
+ final Channel channel = context.channel();
+
+ final BatchMessage batchMessage = readJsonMessage(context,
sequenceNumber, payloadSize, buffer);
+ if (batchMessage == null) {
+ log.trace("State [Read JSON] Message not found Remote [{}]",
channel.remoteAddress());
+ } else {
+ processBatchMessage(batchMessage, batches);
+ log.debug("Processed JSON Message Sequence Number [{}] Payload
Size [{}] Local [{}] Remote [{}]", sequenceNumber, payloadSize,
channel.localAddress(), channel.remoteAddress());
+ }
+ }
+
+ private BatchMessage readJsonMessage(
+ final ChannelHandlerContext context,
+ final int sequenceNumber,
+ final int payloadSize,
+ final ByteBuf buffer
+ ) {
+ final BatchMessage batchMessage;
+
+ final int readableBytes = buffer.readableBytes();
+ if (readableBytes >= payloadSize) {
+ final byte[] payload = new byte[payloadSize];
+ buffer.readBytes(payload);
+
+ final Channel channel = context.channel();
+ final String sender = getRemoteHostAddress(channel);
+ batchMessage = new BatchMessage(sender, payload, sequenceNumber);
+ } else {
+ batchMessage = null;
+ log.trace("State [Read JSON] Sequence Number [{}] not enough
readable bytes [{}] for payload [{}]", sequenceNumber, readableBytes,
payloadSize);
+ }
+
+ return batchMessage;
+ }
+
+ private String getRemoteHostAddress(final Channel channel) {
+ final String remoteHostAddress;
+
+ final SocketAddress remoteAddress = channel.remoteAddress();
+ if (remoteAddress instanceof InetSocketAddress) {
+ final InetSocketAddress remoteSocketAddress = (InetSocketAddress)
remoteAddress;
+ final InetAddress address = remoteSocketAddress.getAddress();
+ remoteHostAddress = address.getHostAddress();
+ } else {
+ remoteHostAddress = remoteAddress.toString();
+ }
+
+ return remoteHostAddress;
+ }
+
+ private void processBatchMessage(final BatchMessage batchMessage, final
List<Object> batches) {
+ if (batchMessages.offer(batchMessage)) {
+ resetSequenceVersionPayloadSize();
+ resetFrameTypeVersion();
+
+ if (windowSize.get() == batchMessages.size()) {
+ final Collection<BatchMessage> messages = new
ArrayList<>(batchMessages);
+ final Batch batch = new Batch(messages);
+ batches.add(batch);
+
+ resetWindowSize();
+ }
+ } else {
+ final String message = String.format("Received message exceeds
Window Size [%d]", windowSize.get());
+ throw new ProtocolException(message);
+ }
+ }
+
+ private void readCompressedBuffer(final ByteBuf compressedBuffer, final
ByteBuf inflated, final int compressedSize) {
+ final Inflater inflater = new Inflater();
+ try (
+ final ByteBufOutputStream outputStream = new
ByteBufOutputStream(inflated);
+ final InflaterOutputStream inflaterOutputStream = new
InflaterOutputStream(outputStream, inflater)
+ ) {
+ compressedBuffer.readBytes(inflaterOutputStream, compressedSize);
+ } catch (final IOException e) {
+ final String message = String.format("Read Compressed Payload Size
[%d] failed", compressedSize);
+ throw new ProtocolException(message, e);
+ } finally {
+ inflater.end();
+ }
+ }
+
+ private Integer readSequenceNumber(final ByteBuf buffer) {
+ if (sequenceNumberRef.get() == null) {
+ final Integer readSequenceNumber = readUnsignedInteger(buffer);
+ if (readSequenceNumber == null) {
+ log.trace("State [Read JSON] not enough readable bytes for
Sequence Number");
+ } else {
+ sequenceNumberRef.set(readSequenceNumber);
+ }
+ }
+
+ return sequenceNumberRef.get();
+ }
+
+ private Integer readPayloadSize(final ByteBuf buffer) {
+ if (payloadSizeRef.get() == null) {
+ final Integer readPayloadSize = readUnsignedInteger(buffer);
+ if (readPayloadSize == null) {
+ log.trace("State [Read JSON] not enough readable bytes for
Payload Size");
+ } else {
+ payloadSizeRef.set(readPayloadSize);
+ }
+ }
+
+ return payloadSizeRef.get();
+ }
+
+ private Integer readCompressedSize(final ByteBuf buffer) {
+ if (compressedSizeRef.get() == null) {
+ final Integer readCompressedSize = readUnsignedInteger(buffer);
+ if (readCompressedSize == null) {
+ log.trace("State [Read Compressed] not enough readable bytes
for Compressed Size");
+ } else {
+ compressedSizeRef.set(readCompressedSize);
+ }
+ }
+
+ return compressedSizeRef.get();
+ }
+
+ private Integer readUnsignedInteger(final ByteBuf buffer) {
+ final Integer number;
+
+ final int readableBytes = buffer.readableBytes();
+ if (readableBytes >= INT_READABLE_BYTES) {
+ final long unsigned = buffer.readUnsignedInt();
+ number = Math.toIntExact(unsigned);
+ } else {
+ number = null;
+ }
+
+ return number;
+ }
+
+ private FrameType readFrameType(final ByteBuf buffer) {
+ if (frameTypeRef.get() == null) {
+ final int readableBytes = buffer.readableBytes();
+ if (readableBytes >= CODE_READABLE_BYTES) {
+ final byte frameTypeCode = buffer.readByte();
+ final FrameType frameType =
FRAME_TYPE_DECODER.readProtocolCode(frameTypeCode);
+ frameTypeRef.set(frameType);
+ } else {
+ log.trace("State [Read Frame Type] not enough readable bytes
[{}]", readableBytes);
+ }
+ }
+
+ return frameTypeRef.get();
+ }
+
+ private ProtocolVersion readVersion(final ByteBuf buffer) {
+ if (versionRef.get() == null) {
+ final int readableBytes = buffer.readableBytes();
+ if (readableBytes >= CODE_READABLE_BYTES) {
+ final byte versionCode = buffer.readByte();
+ final ProtocolVersion protocolVersion =
VERSION_DECODER.readProtocolCode(versionCode);
+ versionRef.set(protocolVersion);
+ } else {
+ log.trace("State [Read Version] not enough readable bytes
[{}]", readableBytes);
+ }
+ }
+
+ return versionRef.get();
+ }
+
+ private void resetSequenceVersionPayloadSize() {
+ sequenceNumberRef.set(null);
+ payloadSizeRef.set(null);
+ }
+
+ private void resetFrameTypeVersion() {
+ frameTypeRef.set(null);
+ versionRef.set(null);
+ }
+
+ private void resetWindowSize() {
+ windowSize.set(INITIAL_WINDOW_SIZE);
+ batchMessages.clear();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/MessageAckEncoder.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/MessageAckEncoder.java
new file mode 100644
index 0000000000..f455207791
--- /dev/null
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/MessageAckEncoder.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.beats.protocol.FrameType;
+import org.apache.nifi.processors.beats.protocol.MessageAck;
+import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
+
+import java.util.Objects;
+
+/**
+ * Beats Message Acknowledgement Encoder writes Protocol Version 2 ACK packets
with a specified sequence number
+ */
[email protected]
+public class MessageAckEncoder extends MessageToByteEncoder<MessageAck> {
+ private final ComponentLog log;
+
+ /**
+ * Message Acknowledgment Encoder with required arguments
+ *
+ * @param log Processor Log
+ */
+ public MessageAckEncoder(final ComponentLog log) {
+ this.log = Objects.requireNonNull(log, "Component Log required");
+ }
+
+ /**
+ * Encode Message Acknowledgement to the buffer with Protocol Version 2
and ACK Frame Type
+ *
+ * @param context Channel Handler Context
+ * @param messageAck Message Acknowledgement containing Sequence Number
+ * @param buffer Byte Buffer
+ */
+ @Override
+ protected void encode(final ChannelHandlerContext context, final
MessageAck messageAck, final ByteBuf buffer) {
+ buffer.writeByte(ProtocolVersion.VERSION_2.getCode());
+ buffer.writeByte(FrameType.ACK.getCode());
+
+ final int sequenceNumber = messageAck.getSequenceNumber();
+ buffer.writeInt(sequenceNumber);
+
+ final Channel channel = context.channel();
+ log.debug("Encoded Message Ack Sequence Number [{}] Local [{}] Remote
[{}]", sequenceNumber, channel.localAddress(), channel.remoteAddress());
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
deleted file mode 100644
index e870660302..0000000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.netty;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
-import org.apache.nifi.processors.beats.frame.BeatsDecoder;
-import org.apache.nifi.processors.beats.frame.BeatsFrame;
-import org.apache.nifi.processors.beats.frame.BeatsFrameType;
-import org.apache.nifi.processors.beats.frame.BeatsMetadata;
-
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Decode a Beats message's bytes into a BeatsMessage object
- */
-public class BeatsFrameDecoder extends ByteToMessageDecoder {
-
- private final Charset charset;
- private final ComponentLog logger;
- private final BeatsMessageFactory messageFactory;
-
- public BeatsFrameDecoder(final ComponentLog logger, final Charset charset)
{
- this.charset = charset;
- this.logger = logger;
- this.messageFactory = new BeatsMessageFactory();
- }
-
- @Override
- protected void decode(final ChannelHandlerContext ctx, final ByteBuf in,
final List<Object> out) {
- final int total = in.readableBytes();
- final String senderSocket = ctx.channel().remoteAddress().toString();
- final BeatsDecoder decoder = new BeatsDecoder(charset, logger);
-
- for (int i = 0; i < total; i++) {
- byte currByte = in.readByte();
-
- // decode the bytes and once we find the end of a frame, handle
the frame
- if (decoder.process(currByte)) {
- final List<BeatsFrame> frames = decoder.getFrames();
- for (BeatsFrame frame : frames) {
- logger.debug("Received Beats Frame Sender [{}] Transaction
[{}] Frame Type [{}]",
- senderSocket, frame.getSeqNumber(),
frame.getFrameType());
- // Ignore the WINDOW SIZE type frames as they contain no
payload.
- if (frame.getFrameType() != BeatsFrameType.WINDOWSIZE) {
- handle(frame, senderSocket, out);
- }
- }
- }
- }
- }
-
- private void handle(final BeatsFrame frame, final String sender, final
List<Object> out) {
- final Map<String, String> metadata =
EventFactoryUtil.createMapWithSender(sender);
- metadata.put(BeatsMetadata.SEQNUMBER_KEY,
String.valueOf(frame.getSeqNumber()));
-
- if (frame.getFrameType() == BeatsFrameType.JSON) {
- final BeatsMessage event =
messageFactory.create(frame.getPayload(), metadata);
- out.add(event);
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
deleted file mode 100644
index 0518a12545..0000000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.netty;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.beats.frame.BeatsEncoder;
-import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
-import org.apache.nifi.processors.beats.response.BeatsResponse;
-
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Decode data received into a BeatsMessage
- */
[email protected]
-public class BeatsMessageChannelHandler extends
SimpleChannelInboundHandler<BeatsMessage> {
-
- private final ComponentLog componentLog;
- private final BlockingQueue<BeatsMessage> events;
- private final BeatsEncoder encoder;
-
- public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events,
ComponentLog componentLog) {
- this.events = events;
- this.componentLog = componentLog;
- this.encoder = new BeatsEncoder();
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) {
- componentLog.debug("Beats Message Received Length [{}] Remote Address
[{}] ", msg.getMessage().length, msg.getSender());
- if (events.offer(msg)) {
- componentLog.debug("Event Queued: Beats Message Sender [{}]
Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
- BeatsChannelResponse successResponse = new
BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber()));
-
ctx.writeAndFlush(Unpooled.wrappedBuffer(successResponse.toByteArray()));
- } else {
- componentLog.warn("Beats Queue Full: Failed Beats Message Sender
[{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/Batch.java
similarity index 66%
copy from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
copy to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/Batch.java
index baa34a2350..1f24405bfb 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/Batch.java
@@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
+
+import java.util.Collection;
+import java.util.Objects;
/**
- * Represents an error encountered when decoding frames.
+ * Batch of Beats Messages
*/
-public class BeatsFrameException extends RuntimeException {
+public class Batch {
+ private final Collection<BatchMessage> messages;
- public BeatsFrameException(String message) {
- super(message);
+ public Batch(final Collection<BatchMessage> messages) {
+ this.messages = Objects.requireNonNull(messages, "Message required");
}
- public BeatsFrameException(String message, Throwable cause) {
- super(message, cause);
+ public Collection<BatchMessage> getMessages() {
+ return messages;
}
-
-}
\ No newline at end of file
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/BatchMessage.java
similarity index 66%
rename from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java
rename to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/BatchMessage.java
index 70f9d4d5a2..3b1c5bb9cc 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/BatchMessage.java
@@ -14,23 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.netty;
+package org.apache.nifi.processors.beats.protocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
/**
- * A Beats message which adds a sequence number to the ByteArrayMessage.
+ * Beats Batch Message containing JSON payload and sequence number
*/
-public class BeatsMessage extends ByteArrayMessage {
+public class BatchMessage extends ByteArrayMessage {
- private final int seqNumber;
+ private final int sequenceNumber;
- public BeatsMessage(final String sender, final byte[] data, final int
seqNumber) {
- super(data, sender);
- this.seqNumber = seqNumber;
+ public BatchMessage(final String sender, final byte[] payload, final int
sequenceNumber) {
+ super(payload, sender);
+ this.sequenceNumber = sequenceNumber;
}
- public int getSeqNumber() {
- return seqNumber;
+ public int getSequenceNumber() {
+ return sequenceNumber;
}
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameType.java
similarity index 69%
copy from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
copy to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameType.java
index baa34a2350..640cd8c364 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameType.java
@@ -14,19 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
/**
- * Represents an error encountered when decoding frames.
+ * Beats Protocol Frame Type
*/
-public class BeatsFrameException extends RuntimeException {
+public enum FrameType implements ProtocolCode {
+ ACK('A'),
- public BeatsFrameException(String message) {
- super(message);
- }
+ COMPRESSED('C'),
+
+ DATA('D'),
+
+ JSON('J'),
- public BeatsFrameException(String message, Throwable cause) {
- super(message, cause);
+ WINDOW_SIZE('W');
+
+ private final int code;
+
+ FrameType(final char code) {
+ this.code = code;
}
-}
\ No newline at end of file
+ @Override
+ public int getCode() {
+ return code;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameTypeDecoder.java
similarity index 55%
copy from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
copy to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameTypeDecoder.java
index 73be08f7cc..3f02d9ab3d 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/FrameTypeDecoder.java
@@ -14,22 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.netty;
+package org.apache.nifi.processors.beats.protocol;
-import org.apache.nifi.processor.util.listen.event.NetworkEventFactory;
-import org.apache.nifi.processors.beats.frame.BeatsMetadata;
-
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Optional;
/**
- * An EventFactory implementation to create BeatsMessages.
+ * Beats Frame Type Decoder
*/
-public class BeatsMessageFactory implements NetworkEventFactory<BeatsMessage> {
+public class FrameTypeDecoder implements ProtocolCodeDecoder<FrameType> {
@Override
- public BeatsMessage create(final byte[] data, final Map<String, String>
metadata) {
- final int sequenceNumber =
Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
- final String sender = metadata.get(BeatsMetadata.SENDER_KEY);
- return new BeatsMessage(sender, data, sequenceNumber);
+ public FrameType readProtocolCode(final byte code) {
+ final Optional<FrameType> frameTypeFound =
Arrays.stream(FrameType.values()).filter(
+ frameType -> frameType.getCode() == code
+ ).findFirst();
+
+ return frameTypeFound.orElseThrow(() -> {
+ final String message = String.format("Frame Type Code [%d] not
supported", code);
+ return new ProtocolException(message);
+ });
}
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsState.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/MessageAck.java
similarity index 71%
rename from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsState.java
rename to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/MessageAck.java
index b18cf8537b..2778655226 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsState.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/MessageAck.java
@@ -14,15 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
/**
- * The stages of parsing of a Beats conversation.
+ * Beats Message Acknowledgement
*/
-public enum BeatsState {
+public class MessageAck {
+ private final int sequenceNumber;
- VERSION, // First stage is parsing the version
- FRAMETYPE, // Second stage is to be able to read the frame type
- PAYLOAD, // payload being populated
- COMPLETE // complete packet handling
-}
\ No newline at end of file
+ public MessageAck(final int sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCode.java
similarity index 75%
rename from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java
rename to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCode.java
index 2dc5a74d89..7112f5ccc1 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCode.java
@@ -14,13 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
/**
- * Metadata keys for Beats message.
+ * Beats Protocol Code interface abstraction
*/
-public interface BeatsMetadata {
-
- String SEQNUMBER_KEY = "beats.sequencenumber";
- String SENDER_KEY = "sender";
+public interface ProtocolCode {
+ /**
+ * Get Protocol Code as transmitted over a socket connection
+ *
+ * @return Protocol Code
+ */
+ int getCode();
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCodeDecoder.java
similarity index 69%
rename from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java
rename to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCodeDecoder.java
index 77a32728c4..a853dffdce 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolCodeDecoder.java
@@ -14,12 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
-public final class BeatsFrameType {
- public static final byte WINDOWSIZE = 0x57;
- public static final byte DATA = 0x44;
- public static final byte COMPRESSED = 0x43;
- public static final byte ACK = 0x41;
- public static final byte JSON = 0x4a;
+/**
+ * Decoder for Protocol Code byte values
+ *
+ * @param <T> Protocol Code Type
+ */
+public interface ProtocolCodeDecoder<T extends ProtocolCode> {
+ /**
+ * Read Protocol Code
+ *
+ * @param code Code byte value
+ * @return Protocol Code
+ */
+ T readProtocolCode(byte code);
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolException.java
similarity index 58%
copy from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
copy to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolException.java
index baa34a2350..45d7c9cc63 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolException.java
@@ -14,19 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
/**
- * Represents an error encountered when decoding frames.
+ * Beats Protocol Exception
*/
-public class BeatsFrameException extends RuntimeException {
-
- public BeatsFrameException(String message) {
+public class ProtocolException extends RuntimeException {
+ /**
+ * Protocol Exception constructor with message containing protocol failure
details
+ *
+ * @param message Protocol failure details
+ */
+ public ProtocolException(final String message) {
super(message);
}
- public BeatsFrameException(String message, Throwable cause) {
+ /**
+ * Protocol Exception constructor with message and cause of failure details
+ *
+ * @param message Protocol failure details
+ * @param cause Cause of failure
+ */
+ public ProtocolException(final String message, final Throwable cause) {
super(message, cause);
}
-
-}
\ No newline at end of file
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersion.java
similarity index 71%
rename from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
rename to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersion.java
index baa34a2350..fc7972e725 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameException.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersion.java
@@ -14,19 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.frame;
+package org.apache.nifi.processors.beats.protocol;
/**
- * Represents an error encountered when decoding frames.
+ * Beats Protocol Version
*/
-public class BeatsFrameException extends RuntimeException {
+public enum ProtocolVersion implements ProtocolCode {
+ VERSION_1('1'),
- public BeatsFrameException(String message) {
- super(message);
- }
+ VERSION_2('2');
+
+ private final int code;
- public BeatsFrameException(String message, Throwable cause) {
- super(message, cause);
+ ProtocolVersion(final char code) {
+ this.code = code;
}
-}
\ No newline at end of file
+ @Override
+ public int getCode() {
+ return code;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersionDecoder.java
similarity index 54%
rename from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
rename to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersionDecoder.java
index 73be08f7cc..3f06d382e5 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/protocol/ProtocolVersionDecoder.java
@@ -14,22 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.netty;
+package org.apache.nifi.processors.beats.protocol;
-import org.apache.nifi.processor.util.listen.event.NetworkEventFactory;
-import org.apache.nifi.processors.beats.frame.BeatsMetadata;
-
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Optional;
/**
- * An EventFactory implementation to create BeatsMessages.
+ * Beats Protocol Version Decoder
*/
-public class BeatsMessageFactory implements NetworkEventFactory<BeatsMessage> {
+public class ProtocolVersionDecoder implements
ProtocolCodeDecoder<ProtocolVersion> {
@Override
- public BeatsMessage create(final byte[] data, final Map<String, String>
metadata) {
- final int sequenceNumber =
Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
- final String sender = metadata.get(BeatsMetadata.SENDER_KEY);
- return new BeatsMessage(sender, data, sequenceNumber);
+ public ProtocolVersion readProtocolCode(final byte code) {
+ final Optional<ProtocolVersion> protocolVersionFound =
Arrays.stream(ProtocolVersion.values()).filter(
+ protocolVersion -> protocolVersion.getCode() == code
+ ).findFirst();
+
+ return protocolVersionFound.orElseThrow(() -> {
+ final String message = String.format("Version Code [%d] not
supported", code);
+ return new ProtocolException(message);
+ });
}
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java
deleted file mode 100644
index 5890771ab0..0000000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsChannelResponse.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.response;
-
-import org.apache.nifi.processor.util.listen.response.ChannelResponse;
-import org.apache.nifi.processors.beats.frame.BeatsFrame;
-import org.apache.nifi.processors.beats.frame.BeatsEncoder;
-
-/**
- * Creates a BeatsFrame for the provided response and returns the encoded
frame.
- */
-public class BeatsChannelResponse implements ChannelResponse {
-
- private final BeatsEncoder encoder;
- private final BeatsResponse response;
-
- public BeatsChannelResponse(final BeatsEncoder encoder, final
BeatsResponse response) {
- this.encoder = encoder;
- this.response = response;
- }
-
- @Override
- public byte[] toByteArray() {
- final BeatsFrame frame = response.toFrame();
- return encoder.encode(frame);
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
deleted file mode 100644
index 7f12c9d7d1..0000000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.response;
-
-import org.apache.nifi.processors.beats.frame.BeatsFrame;
-
-import java.nio.ByteBuffer;
-
-/**
- 'ack' frame type
-
- SENT FROM READER ONLY
- frame type value: ASCII 'A' aka byte value 0x41
-
- Payload:
- 32bit unsigned sequence number.
-
- */
-public class BeatsResponse {
- private final int seqNumber;
- final private byte version = 0x32; // v2
- final private byte frameType = 0x41; // A or ACK
-
-
-
- public BeatsResponse(final int seqNumber) {
- this.seqNumber = seqNumber;
- }
-
- /**
- * Creates a BeatsFrame where the data portion will contain this response.
- *
- *
- * @return a BeatsFrame for for this response
- */
- public BeatsFrame toFrame() {
-
- return new BeatsFrame.Builder()
- .version(version)
- .frameType(frameType)
- .payload(ByteBuffer.allocate(4).putInt(seqNumber).array())
- .build();
- }
-
- public static BeatsResponse ok(final int seqNumber) {
- return new BeatsResponse(seqNumber);
- }
-}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/server/BeatsMessageServerFactory.java
similarity index 69%
rename from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java
rename to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/server/BeatsMessageServerFactory.java
index 2e0414189d..35aa3c1547 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/server/BeatsMessageServerFactory.java
@@ -14,43 +14,46 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.netty;
+package org.apache.nifi.processors.beats.server;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
import
org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.beats.handler.BatchChannelInboundHandler;
+import org.apache.nifi.processors.beats.handler.BatchDecoder;
+import org.apache.nifi.processors.beats.handler.MessageAckEncoder;
+import org.apache.nifi.processors.beats.protocol.BatchMessage;
import java.net.InetAddress;
-import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
/**
- * Netty Event Server Factory implementation for RELP Messages
+ * Beats Message Protocol extends of Netty Event Server Factory
*/
public class BeatsMessageServerFactory extends NettyEventServerFactory {
-
/**
- * RELP Message Server Factory to receive RELP messages
+ * Beats Message Server Factory constructor with standard configuration
arguments
+ *
* @param log Component Log
* @param address Server Address
* @param port Server Port Number
- * @param charset Charset to use when decoding RELP messages
* @param events Blocking Queue for events received
*/
public BeatsMessageServerFactory(final ComponentLog log,
final InetAddress address,
final int port,
- final Charset charset,
- final BlockingQueue<BeatsMessage> events)
{
+ final BlockingQueue<BatchMessage> events)
{
super(address, port, TransportProtocol.TCP);
- final BeatsMessageChannelHandler beatsChannelHandler = new
BeatsMessageChannelHandler(events, log);
+ final MessageAckEncoder messageAckEncoder = new MessageAckEncoder(log);
+ final BatchChannelInboundHandler batchChannelInboundHandler = new
BatchChannelInboundHandler(log, events);
final LogExceptionChannelHandler logExceptionChannelHandler = new
LogExceptionChannelHandler(log);
setHandlerSupplier(() -> Arrays.asList(
- new BeatsFrameDecoder(log, charset),
- beatsChannelHandler,
+ messageAckEncoder,
+ new BatchDecoder(log),
+ batchChannelInboundHandler,
logExceptionChannelHandler
));
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/ListenBeatsTest.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/ListenBeatsTest.java
new file mode 100644
index 0000000000..9b74a19e4f
--- /dev/null
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/ListenBeatsTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats;
+
+import org.apache.nifi.processor.util.listen.ListenerProperties;
+import org.apache.nifi.processors.beats.protocol.FrameType;
+import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.zip.DeflaterOutputStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class ListenBeatsTest {
+
+ private static final String LOCALHOST = "127.0.0.1";
+
+ private static final String LOCALHOST_TRANSIT_URI = "beats://127.0.0.1:%d";
+
+ private static final int ACK_PACKET_LENGTH = 6;
+
+ private static final int FIRST_SEQUENCE_NUMBER = 1;
+
+ private static final int INTEGER_BUFFER_SIZE = 4;
+
+ private static final String JSON_PAYLOAD =
"{\"@timestamp\":\"2022-10-31T12:30:45.678Z\",\"message\":\"Processing
Started\"}";
+
+ private static final int WINDOWED_MESSAGES = 50;
+
+ TestRunner runner;
+
+ @BeforeEach
+ void setRunner() {
+ runner = TestRunners.newTestRunner(ListenBeats.class);
+ }
+
+ @Timeout(10)
+ @Test
+ void testRunSingleJsonMessage() throws Exception {
+ final int port = NetworkUtils.getAvailableTcpPort();
+ runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
+
+ startServer();
+
+ try (
+ final Socket socket = new Socket(LOCALHOST, port);
+ final InputStream inputStream = socket.getInputStream();
+ final OutputStream outputStream = socket.getOutputStream()
+ ) {
+ sendMessage(outputStream, FIRST_SEQUENCE_NUMBER);
+ assertAckPacketMatched(inputStream, FIRST_SEQUENCE_NUMBER);
+ }
+
+ assertFlowFilesSuccess(1);
+ assertReceiveEventFound(port);
+ }
+
+ @Timeout(10)
+ @Test
+ void testRunWindowSizeJsonMessages() throws Exception {
+ final int port = NetworkUtils.getAvailableTcpPort();
+ runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
+
+ startServer();
+
+ try (
+ final Socket socket = new Socket(LOCALHOST, port);
+ final InputStream inputStream = socket.getInputStream();
+ final OutputStream outputStream = socket.getOutputStream()
+ ) {
+ sendWindowSize(outputStream);
+
+ for (int sequenceNumber = FIRST_SEQUENCE_NUMBER; sequenceNumber <=
WINDOWED_MESSAGES; sequenceNumber++) {
+ sendMessage(outputStream, sequenceNumber);
+ }
+
+ assertAckPacketMatched(inputStream, WINDOWED_MESSAGES);
+ }
+
+ assertFlowFilesSuccess(WINDOWED_MESSAGES);
+ assertReceiveEventFound(port);
+ }
+
+ @Timeout(10)
+ @Test
+ void testRunWindowSizeCompressedJsonMessages() throws Exception {
+ final int port = NetworkUtils.getAvailableTcpPort();
+ runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
+
+ startServer();
+
+ try (
+ final Socket socket = new Socket(LOCALHOST, port);
+ final InputStream inputStream = socket.getInputStream();
+ final OutputStream outputStream = socket.getOutputStream()
+ ) {
+ sendWindowSize(outputStream);
+
+ final ByteArrayOutputStream compressedOutputStream = new
ByteArrayOutputStream();
+ final DeflaterOutputStream deflaterOutputStream = new
DeflaterOutputStream(compressedOutputStream);
+
+ for (int sequenceNumber = FIRST_SEQUENCE_NUMBER; sequenceNumber <=
WINDOWED_MESSAGES; sequenceNumber++) {
+ sendMessage(deflaterOutputStream, sequenceNumber);
+ }
+
+ deflaterOutputStream.close();
+ final byte[] compressed = compressedOutputStream.toByteArray();
+ sendCompressed(outputStream, compressed);
+
+ assertAckPacketMatched(inputStream, WINDOWED_MESSAGES);
+ }
+
+ assertFlowFilesSuccess(WINDOWED_MESSAGES);
+ assertReceiveEventFound(port);
+ }
+
+ private void startServer() {
+ runner.run(1, false, true);
+ }
+
+ private void assertReceiveEventFound(final int port) {
+ final Optional<ProvenanceEventRecord> receiveRecord =
runner.getProvenanceEvents().stream().filter(record ->
+ ProvenanceEventType.RECEIVE == record.getEventType()
+ ).findFirst();
+
+ assertTrue(receiveRecord.isPresent());
+ final ProvenanceEventRecord record = receiveRecord.get();
+
+ final String expectedTransitUri = String.format(LOCALHOST_TRANSIT_URI,
port);
+ assertEquals(expectedTransitUri, record.getTransitUri());
+ }
+
+ private void assertFlowFilesSuccess(final int expectedFlowFiles) {
+ runner.run(expectedFlowFiles, true, false);
+ runner.assertTransferCount(ListenBeats.REL_SUCCESS, expectedFlowFiles);
+
+ final Iterator<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(ListenBeats.REL_SUCCESS).iterator();
+ int i = 1;
+ while (flowFiles.hasNext()) {
+ final MockFlowFile flowFile = flowFiles.next();
+ final String content = flowFile.getContent();
+ assertEquals(JSON_PAYLOAD, content, String.format("FlowFile
Content [%d] not matched", i));
+ i++;
+ }
+ }
+
+ private void sendWindowSize(final OutputStream outputStream) throws
IOException {
+ outputStream.write(ProtocolVersion.VERSION_2.getCode());
+
+ outputStream.write(FrameType.WINDOW_SIZE.getCode());
+
+ final byte[] windowSize = getUnsignedInteger(WINDOWED_MESSAGES);
+ outputStream.write(windowSize);
+
+ outputStream.flush();
+ }
+
+ private void sendMessage(final OutputStream outputStream, final int
sequenceNumber) throws IOException {
+ outputStream.write(ProtocolVersion.VERSION_2.getCode());
+
+ outputStream.write(FrameType.JSON.getCode());
+
+ final byte[] sequenceNumberEncoded =
getUnsignedInteger(sequenceNumber);
+ outputStream.write(sequenceNumberEncoded);
+
+ final int payloadLength = JSON_PAYLOAD.length();
+
+ final byte[] payloadSize = getUnsignedInteger(payloadLength);
+ outputStream.write(payloadSize);
+
+ outputStream.write(JSON_PAYLOAD.getBytes(StandardCharsets.UTF_8));
+
+ outputStream.flush();
+ }
+
+ private void sendCompressed(final OutputStream outputStream, final byte[]
compressed) throws IOException {
+ outputStream.write(ProtocolVersion.VERSION_2.getCode());
+
+ outputStream.write(FrameType.COMPRESSED.getCode());
+
+ final int payloadLength = compressed.length;
+
+ final byte[] payloadSize = getUnsignedInteger(payloadLength);
+ outputStream.write(payloadSize);
+
+ outputStream.write(compressed);
+
+ outputStream.flush();
+ }
+
+ private void assertAckPacketMatched(final InputStream inputStream, final
int expectedSequenceNumber) throws IOException {
+ final byte[] ackPacket = new byte[ACK_PACKET_LENGTH];
+ final int bytesRead = inputStream.read(ackPacket);
+
+ assertEquals(ACK_PACKET_LENGTH, bytesRead);
+
+ final ByteBuffer ackPacketBuffer = ByteBuffer.wrap(ackPacket);
+
+ final byte version = ackPacketBuffer.get();
+ assertEquals(ProtocolVersion.VERSION_2.getCode(), version);
+
+ final byte frameType = ackPacketBuffer.get();
+ assertEquals(FrameType.ACK.getCode(), frameType);
+
+ final int sequenceNumber = ackPacketBuffer.getInt();
+ assertEquals(expectedSequenceNumber, sequenceNumber);
+ }
+
+ private byte[] getUnsignedInteger(final int number) {
+ return ByteBuffer.allocate(INTEGER_BUFFER_SIZE).putInt(number).array();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java
deleted file mode 100644
index b1a8b0da57..0000000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsEncoder.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import javax.xml.bind.DatatypeConverter;
-import java.nio.ByteBuffer;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-
-
-public class TestBeatsEncoder {
- private BeatsEncoder encoder;
-
-
- @BeforeEach
- public void setup() {
- this.encoder = new BeatsEncoder();
- }
-
- @Test
- public void testEncode() {
- BeatsFrame frame = new BeatsFrame.Builder()
- .version((byte) 0x31)
- .frameType((byte) 0x41)
- .payload(ByteBuffer.allocate(4).putInt(123).array())
- .build();
-
- byte[] encoded = encoder.encode(frame);
-
- assertArrayEquals(DatatypeConverter.parseHexBinary("31410000007B"),
encoded);
- }
-}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java
deleted file mode 100644
index 1d5b28ac51..0000000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/frame/TestBeatsFrame.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.frame;
-
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-public class TestBeatsFrame {
-
- @Test
- public void testInvalidVersion() {
- assertThrows(BeatsFrameException.class, () -> new
BeatsFrame.Builder().seqNumber(1234).dataSize(3).build());
- }
-
- @Test
- public void testInvalidFrameType() {
- assertThrows(BeatsFrameException.class, () -> new
BeatsFrame.Builder().frameType((byte) 0x70).dataSize(5).build());
- }
-
- @Test
- public void testBlankFrameType() {
- assertThrows(BeatsFrameException.class, () -> new
BeatsFrame.Builder().frameType(((byte) 0x00)).dataSize(5).build());
- }
-}
\ No newline at end of file