This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 08153b8 NIFI-9453 Refactored ListenBeats using Netty
08153b8 is described below
commit 08153b826014a252ad078dd17c929fe1e13ad7ba
Author: Nathan Gough <[email protected]>
AuthorDate: Mon Jan 10 23:29:25 2022 -0500
NIFI-9453 Refactored ListenBeats using Netty
This closes #5669
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/processors/beats/ListenBeats.java | 227 +++++++++++++--------
.../nifi/processors/beats/frame/BeatsDecoder.java | 23 +--
.../BeatsFrameType.java} | 14 +-
.../beats/{event => frame}/BeatsMetadata.java | 5 +-
.../beats/handler/BeatsFrameHandler.java | 87 --------
.../handler/BeatsSSLSocketChannelHandler.java | 94 ---------
.../beats/handler/BeatsSocketChannelHandler.java | 103 ----------
.../handler/BeatsSocketChannelHandlerFactory.java | 56 -----
.../processors/beats/netty/BeatsFrameDecoder.java | 81 ++++++++
.../BeatsEvent.java => netty/BeatsMessage.java} | 16 +-
.../beats/netty/BeatsMessageChannelHandler.java | 57 ++++++
.../BeatsMessageFactory.java} | 20 +-
.../beats/netty/BeatsMessageServerFactory.java | 57 ++++++
.../processors/beats/response/BeatsResponse.java | 1 -
.../beats/event/TestBeatsEventFactory.java | 52 -----
.../beats/handler/TestBeatsFrameHandler.java | 157 --------------
.../handler/TestBeatsSocketChannelHandler.java | 227 ---------------------
.../nifi/processors/standard/TestListenRELP.java | 5 +-
18 files changed, 378 insertions(+), 904 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
index 084430a..889f11a 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java
@@ -24,44 +24,47 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.event.transport.EventException;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
+import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
-import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
-import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
-import
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processor.util.listen.response.ChannelResponse;
-import org.apache.nifi.processors.beats.event.BeatsEvent;
-import org.apache.nifi.processors.beats.event.BeatsEventFactory;
-import org.apache.nifi.processors.beats.frame.BeatsEncoder;
-import
org.apache.nifi.processors.beats.handler.BeatsSocketChannelHandlerFactory;
-import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
-import org.apache.nifi.processors.beats.response.BeatsResponse;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.listen.EventBatcher;
+import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
+import org.apache.nifi.processor.util.listen.ListenerProperties;
+import org.apache.nifi.processors.beats.netty.BeatsMessage;
+import org.apache.nifi.processors.beats.netty.BeatsMessageServerFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
+import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"listen", "beats", "tcp", "logs"})
@@ -75,7 +78,7 @@ import java.util.concurrent.BlockingQueue;
@WritesAttribute(attribute = "mime.type", description = "The mime.type of
the content which is application/json")
})
@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"})
-public class ListenBeats extends
AbstractListenEventBatchingProcessor<BeatsEvent> {
+public class ListenBeats extends AbstractProcessor {
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
.name("SSL_CONTEXT_SERVICE")
@@ -96,13 +99,40 @@ public class ListenBeats extends
AbstractListenEventBatchingProcessor<BeatsEvent
.defaultValue(ClientAuth.REQUIRED.name())
.build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Messages received successfully will be sent out this
relationship.")
+ .build();
+
+ protected List<PropertyDescriptor> descriptors;
+ protected Set<Relationship> relationships;
+ protected volatile int port;
+ protected volatile BlockingQueue<BeatsMessage> events;
+ protected volatile BlockingQueue<BeatsMessage> errorEvents;
+ protected volatile EventServer eventServer;
+ protected volatile byte[] messageDemarcatorBytes;
+ protected volatile EventBatcher<BeatsMessage> eventBatcher;
+
@Override
- protected List<PropertyDescriptor> getAdditionalProperties() {
- return Arrays.asList(
- MAX_CONNECTIONS,
- SSL_CONTEXT_SERVICE,
- CLIENT_AUTH
- );
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+ descriptors.add(ListenerProperties.PORT);
+ descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+ descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+ // Deprecated
+ descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+ descriptors.add(ListenerProperties.CHARSET);
+ descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+ descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+ descriptors.add(ListenerProperties.WORKER_THREADS);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(CLIENT_AUTH);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
@@ -111,13 +141,12 @@ public class ListenBeats extends
AbstractListenEventBatchingProcessor<BeatsEvent
final SSLContextService sslContextService =
validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- if (sslContextService != null &&
sslContextService.isTrustStoreConfigured() == false) {
+ if (sslContextService != null &&
!sslContextService.isTrustStoreConfigured()) {
results.add(new ValidationResult.Builder()
- .explanation("The context service must have a truststore
configured for the beats forwarder client to work correctly")
+ .explanation("SSL Context Service requires a truststore for
the Beats forwarder client to work correctly")
.valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
}
- // Validate CLIENT_AUTH
final String clientAuth =
validationContext.getProperty(CLIENT_AUTH).getValue();
if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
results.add(new ValidationResult.Builder()
@@ -128,87 +157,103 @@ public class ListenBeats extends
AbstractListenEventBatchingProcessor<BeatsEvent
return results;
}
- private volatile BeatsEncoder beatsEncoder;
-
-
@Override
- @OnScheduled
- public void onScheduled(ProcessContext context) throws IOException {
- super.onScheduled(context);
- // wanted to ensure charset was already populated here
- beatsEncoder = new BeatsEncoder();
+ public final Set<Relationship> getRelationships() {
+ return this.relationships;
}
@Override
- protected ChannelDispatcher createDispatcher(final ProcessContext context,
final BlockingQueue<BeatsEvent> events) throws IOException {
- final EventFactory<BeatsEvent> eventFactory = new BeatsEventFactory();
- final ChannelHandlerFactory<BeatsEvent, AsyncChannelDispatcher>
handlerFactory = new BeatsSocketChannelHandlerFactory<>();
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
- final int maxConnections =
context.getProperty(MAX_CONNECTIONS).asInteger();
- final int bufferSize =
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
- final Charset charSet =
Charset.forName(context.getProperty(CHARSET).getValue());
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) throws IOException {
+ final int workerThreads =
context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
+ final int bufferSize =
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ final String networkInterface =
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+ final InetAddress address =
NetworkUtils.getInterfaceAddress(networkInterface);
+ final Charset charset =
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+ port =
context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+ events = new
LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+ errorEvents = new LinkedBlockingQueue<>();
+ final String msgDemarcator = getMessageDemarcator(context);
+ messageDemarcatorBytes = msgDemarcator.getBytes(charset);
- // initialize the buffer pool based on max number of connections and
the buffer size
- final ByteBufferSource byteBufferSource = new
ByteBufferPool(maxConnections, bufferSize);
+ final NettyEventServerFactory eventFactory = new
BeatsMessageServerFactory(getLogger(), address, port, charset, events);
- // if an SSLContextService was provided then create an SSLContext to
pass down to the dispatcher
- SSLContext sslContext = null;
- ClientAuth clientAuth = null;
final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
final String clientAuthValue =
context.getProperty(CLIENT_AUTH).getValue();
- sslContext = sslContextService.createContext();
- clientAuth = ClientAuth.valueOf(clientAuthValue);
-
+ ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
+ SSLContext sslContext = sslContextService.createContext();
+ eventFactory.setSslContext(sslContext);
+ eventFactory.setClientAuth(clientAuth);
}
- // if we decide to support SSL then get the context and pass it in here
- return new SocketChannelDispatcher<>(eventFactory, handlerFactory,
byteBufferSource, events,
- getLogger(), maxConnections, sslContext, clientAuth, charSet);
- }
+ eventFactory.setSocketReceiveBuffer(bufferSize);
+ eventFactory.setWorkerThreads(workerThreads);
+ eventFactory.setThreadNamePrefix(String.format("%s[%s]",
getClass().getSimpleName(), getIdentifier()));
+ try {
+ eventServer = eventFactory.getEventServer();
+ } catch (EventException e) {
+ getLogger().error("Failed to bind to [{}:{}]", address, port, e);
+ }
+ }
@Override
- protected String getBatchKey(BeatsEvent event) {
- return event.getSender();
- }
+ public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
+ EventBatcher<BeatsMessage> eventBatcher = getEventBatcher();
- protected void respond(final BeatsEvent event, final BeatsResponse
beatsResponse) {
- final ChannelResponse response = new
BeatsChannelResponse(beatsEncoder, beatsResponse);
+ final int batchSize =
context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
+ Map<String, FlowFileEventBatch<BeatsMessage>> batches =
eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+ processEvents(session, batches);
+ }
- final ChannelResponder responder = event.getResponder();
- responder.addResponse(response);
- try {
- responder.respond();
- } catch (IOException e) {
- getLogger().error("Error sending response for transaction {} due
to {}",
- new Object[]{event.getSeqNumber(), e.getMessage()}, e);
+ @OnStopped
+ public void stopped() {
+ if (eventServer != null) {
+ eventServer.shutdown();
}
+ eventBatcher = null;
}
- protected void postProcess(final ProcessContext context, final
ProcessSession session, final List<BeatsEvent> events) {
- // first commit the session so we guarantee we have all the events
successfully
- // written to FlowFiles and transferred to the success relationship
- session.commitAsync(() -> {
- // respond to each event to acknowledge successful receipt
- for (final BeatsEvent event : events) {
- respond(event, BeatsResponse.ok(event.getSeqNumber()));
+ private void processEvents(final ProcessSession session, final Map<String,
FlowFileEventBatch<BeatsMessage>> batches) {
+ for (Map.Entry<String, FlowFileEventBatch<BeatsMessage>> entry :
batches.entrySet()) {
+ FlowFile flowFile = entry.getValue().getFlowFile();
+ final List<BeatsMessage> events = entry.getValue().getEvents();
+
+ if (flowFile.getSize() == 0L || events.size() == 0) {
+ session.remove(flowFile);
+ getLogger().debug("No data written to FlowFile from batch {};
removing FlowFile", entry.getKey());
+ continue;
}
- });
+
+ final Map<String,String> attributes =
getAttributes(entry.getValue());
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ getLogger().debug("Transferring {} to success", flowFile);
+ session.transfer(flowFile, REL_SUCCESS);
+ session.adjustCounter("FlowFiles Transferred to Success", 1L,
false);
+
+ // the sender and command will be the same for all events based on
the batch key
+ final String transitUri = getTransitUri(entry.getValue());
+ session.getProvenanceReporter().receive(flowFile, transitUri);
+
+ }
+ session.commitAsync();
}
- @Override
- protected String getTransitUri(FlowFileEventBatch batch) {
- final String sender = batch.getEvents().get(0).getSender();
+ protected String getTransitUri(FlowFileEventBatch<BeatsMessage> batch) {
+ final List<BeatsMessage> events = batch.getEvents();
+ final String sender = events.get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() >
1 ? sender.substring(1) : sender;
- final String transitUri = new
StringBuilder().append("beats").append("://").append(senderHost).append(":")
- .append(port).toString();
- return transitUri;
+ return String.format("beats://%s:%d", senderHost, port);
}
- @Override
- protected Map<String, String> getAttributes(FlowFileEventBatch batch) {
- final List<BeatsEvent> events = batch.getEvents();
+ protected Map<String, String>
getAttributes(FlowFileEventBatch<BeatsMessage> batch) {
+ final List<BeatsMessage> events = batch.getEvents();
// the sender and command will be the same for all events based on the
batch key
final String sender = events.get(0).getSender();
final int numAttributes = events.size() == 1 ? 5 : 4;
@@ -224,6 +269,24 @@ public class ListenBeats extends
AbstractListenEventBatchingProcessor<BeatsEvent
return attributes;
}
+ private String getMessageDemarcator(final ProcessContext context) {
+ return context.getProperty(ListenerProperties.MESSAGE_DELIMITER)
+ .getValue()
+ .replace("\\n", "\n").replace("\\r", "\r").replace("\\t",
"\t");
+ }
+
+ private EventBatcher<BeatsMessage> getEventBatcher() {
+ if (eventBatcher == null) {
+ eventBatcher = new EventBatcher<BeatsMessage>(getLogger(), events,
errorEvents) {
+ @Override
+ protected String getBatchKey(BeatsMessage event) {
+ return event.getSender();
+ }
+ };
+ }
+ return eventBatcher;
+ }
+
public enum beatsAttributes implements FlowFileAttributeKey {
SENDER("beats.sender"),
PORT("beats.port"),
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
index 4d55484..2fa20ca 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsDecoder.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.beats.frame;
+import org.apache.nifi.logging.ComponentLog;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -25,7 +27,6 @@ import java.nio.charset.Charset;
import java.util.LinkedList;
import java.util.List;
import java.util.zip.InflaterInputStream;
-import org.apache.nifi.logging.ComponentLog;
/**
* Decodes a Beats frame by maintaining a state based on each byte that has
been processed. This class
@@ -52,8 +53,6 @@ public class BeatsDecoder {
static final int COMPRESSED_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 4; //
32 bit unsigned + payload
static final int JSON_MIN_LENGTH = MIN_FRAME_HEADER_LENGTH + 8; // 32 bit
unsigned sequence number + 32 bit unsigned payload length
- public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44,
FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
-
/**
* @param charset the charset to decode bytes from the frame
*/
@@ -103,11 +102,11 @@ public class BeatsDecoder {
// At one stage, the data sent to processPAYLOAD will be
represente a complete frame, so we check before returning true
- if (frameBuilder.frameType == FRAME_WINDOWSIZE &&
currState == BeatsState.COMPLETE) {
+ if (frameBuilder.frameType == BeatsFrameType.WINDOWSIZE &&
currState == BeatsState.COMPLETE) {
return true;
- } else if (frameBuilder.frameType == FRAME_COMPRESSED &&
currState == BeatsState.COMPLETE) {
+ } else if (frameBuilder.frameType ==
BeatsFrameType.COMPRESSED && currState == BeatsState.COMPLETE) {
return true;
- } else if (frameBuilder.frameType == FRAME_JSON &&
currState == BeatsState.COMPLETE) {
+ } else if (frameBuilder.frameType == BeatsFrameType.JSON
&& currState == BeatsState.COMPLETE) {
return true;
} else {
break;
@@ -138,7 +137,7 @@ public class BeatsDecoder {
}
try {
// Once compressed frames are expanded, they must be devided into
individual frames
- if (currState == BeatsState.COMPLETE && frameBuilder.frameType ==
FRAME_COMPRESSED) {
+ if (currState == BeatsState.COMPLETE && frameBuilder.frameType ==
BeatsFrameType.COMPRESSED) {
logger.debug("Frame is compressed, will iterate to decode",
new Object[]{});
// Zero currBytes, currState and frameBuilder prior to
iteration over
@@ -178,7 +177,7 @@ public class BeatsDecoder {
// 'compressed' frame type: 1D{k,v}{k,v}1D{k,v}{k,v}1D{k,v}{k,v}"
//
// Therefore, instead of calling process method again, just iterate
over each of
- // the frames and split them so they can be processed by
BeatsFrameHandler
+ // the frames and split them so they can be processed
while (currentData.hasRemaining()) {
@@ -187,7 +186,7 @@ public class BeatsDecoder {
internalFrameBuilder.version = currentData.get();
internalFrameBuilder.frameType = currentData.get();
switch (internalFrameBuilder.frameType) {
- case FRAME_JSON:
+ case BeatsFrameType.JSON:
internalFrameBuilder.seqNumber = (int)
(currentData.getInt() & 0x00000000ffffffffL);
currentData.mark();
@@ -240,7 +239,7 @@ public class BeatsDecoder {
private void processPAYLOAD(final byte b) {
currBytes.write(b);
switch (decodedFrameType) {
- case FRAME_WINDOWSIZE: //'W'
+ case BeatsFrameType.WINDOWSIZE: //'W'
if (currBytes.size() < WINDOWSIZE_LENGTH ) {
logger.trace("Beats currBytes contents are {}", new
Object[] {currBytes.toString()});
break;
@@ -257,7 +256,7 @@ public class BeatsDecoder {
logger.debug("Saw a packet I should not have seen. Packet
contents were {}", new Object[] {currBytes.toString()});
break;
}
- case FRAME_COMPRESSED: //'C'
+ case BeatsFrameType.COMPRESSED: //'C'
if (currBytes.size() < COMPRESSED_MIN_LENGTH) {
if (logger.isTraceEnabled()) {
logger.trace("Beats currBytes contents are {}", new
Object[] {currBytes.toString()});
@@ -300,7 +299,7 @@ public class BeatsDecoder {
}
break;
}
- case FRAME_JSON: // 'J́'
+ case BeatsFrameType.JSON: // 'J́'
// Because Beats can disable compression, sometimes, JSON data
will be received outside a compressed
// stream (i.e. 0x43). Instead of processing it here, we defer
its processing to went getFrames is
// called
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsMetadata.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java
similarity index 72%
copy from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsMetadata.java
copy to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java
index 65dd84b..77a3272 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsMetadata.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsFrameType.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.event;
+package org.apache.nifi.processors.beats.frame;
-/**
- * Metadata keys for event.
- */
-public interface BeatsMetadata {
-
- String SEQNUMBER_KEY = "beats.sequencenumber";
+public final class BeatsFrameType {
+ public static final byte WINDOWSIZE = 0x57;
+ public static final byte DATA = 0x44;
+ public static final byte COMPRESSED = 0x43;
+ public static final byte ACK = 0x41;
+ public static final byte JSON = 0x4a;
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsMetadata.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java
similarity index 88%
rename from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsMetadata.java
rename to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java
index 65dd84b..2dc5a74 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsMetadata.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/frame/BeatsMetadata.java
@@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.event;
+package org.apache.nifi.processors.beats.frame;
/**
- * Metadata keys for event.
+ * Metadata keys for Beats message.
*/
public interface BeatsMetadata {
String SEQNUMBER_KEY = "beats.sequencenumber";
+ String SENDER_KEY = "sender";
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsFrameHandler.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsFrameHandler.java
deleted file mode 100644
index 9e72a3d..0000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsFrameHandler.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.handler;
-
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
-import org.apache.nifi.processor.util.listen.event.EventQueue;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processors.beats.event.BeatsMetadata;
-import org.apache.nifi.processors.beats.frame.BeatsEncoder;
-import org.apache.nifi.processors.beats.frame.BeatsFrame;
-
-
-/**
- * Encapsulates the logic to handle a BeatsFrame once it has been read from
the channel.
- */
-public class BeatsFrameHandler<E extends Event<SocketChannel>> {
-
- private final Charset charset;
- private final EventFactory<E> eventFactory;
- private final EventQueue<E> events;
- private final SelectionKey key;
- private final AsyncChannelDispatcher dispatcher;
- private final BeatsEncoder encoder;
- private final ComponentLog logger;
-
- public static final byte FRAME_WINDOWSIZE = 0x57, FRAME_DATA = 0x44,
FRAME_COMPRESSED = 0x43, FRAME_ACK = 0x41, FRAME_JSON = 0x4a;
-
- public BeatsFrameHandler(final SelectionKey selectionKey,
- final Charset charset,
- final EventFactory<E> eventFactory,
- final BlockingQueue<E> events,
- final AsyncChannelDispatcher dispatcher,
- final ComponentLog logger) {
- this.key = selectionKey;
- this.charset = charset;
- this.eventFactory = eventFactory;
- this.dispatcher = dispatcher;
- this.logger = logger;
- this.events = new EventQueue<>(events, logger);
- this.encoder = new BeatsEncoder();
- }
-
- public void handle(final BeatsFrame frame, final
ChannelResponder<SocketChannel> responder, final String sender)
- throws IOException, InterruptedException {
-
- final Map<String, String> metadata =
EventFactoryUtil.createMapWithSender(sender.toString());
- metadata.put(BeatsMetadata.SEQNUMBER_KEY,
String.valueOf(frame.getSeqNumber()));
- String line = "";
-
- /* If frameType is a JSON , parse the frame payload into a JsonElement
so that all JSON elements but "message"
- are inserted into the event metadata.
-
- As per above, the "message" element gets added into the body of the
event
- */
- if (frame.getFrameType() == FRAME_JSON ) {
- // queue the raw event blocking until space is available, reset
the buffer
- final E event = eventFactory.create(frame.getPayload(), metadata,
responder);
- events.offer(event);
- }
- }
- }
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsSSLSocketChannelHandler.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsSSLSocketChannelHandler.java
deleted file mode 100644
index 042e96a..0000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsSSLSocketChannelHandler.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.handler;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import
org.apache.nifi.processor.util.listen.handler.socket.SSLSocketChannelHandler;
-import
org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder;
-import org.apache.nifi.processors.beats.frame.BeatsDecoder;
-import org.apache.nifi.processors.beats.frame.BeatsFrame;
-import org.apache.nifi.processors.beats.frame.BeatsFrameException;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-
-/**
- * A Beats compatible implementation of SSLSocketChannelHandler.
- */
-public class BeatsSSLSocketChannelHandler<E extends Event<SocketChannel>>
extends SSLSocketChannelHandler<E> {
-
- private BeatsDecoder decoder;
- private BeatsFrameHandler<E> frameHandler;
-
- public BeatsSSLSocketChannelHandler(final SelectionKey key,
- final AsyncChannelDispatcher
dispatcher,
- final Charset charset,
- final EventFactory<E> eventFactory,
- final BlockingQueue<E> events,
- final ComponentLog logger) {
- super(key, dispatcher, charset, eventFactory, events, logger);
- this.decoder = new BeatsDecoder(charset, logger);
- this.frameHandler = new BeatsFrameHandler<>(key, charset,
eventFactory, events, dispatcher, logger);
- }
-
- @Override
- protected void processBuffer(final SSLSocketChannel sslSocketChannel,
final SocketChannel socketChannel,
- final int bytesRead, final byte[] buffer)
throws InterruptedException, IOException {
-
- final InetAddress sender = socketChannel.socket().getInetAddress();
- try {
-
- // go through the buffer parsing the packet command
- for (int i = 0; i < bytesRead; i++) {
- byte currByte = buffer[i];
-
- // if we found the end of a frame, handle the frame and mark
the buffer
- if (decoder.process(currByte)) {
- final List<BeatsFrame> frames = decoder.getFrames();
- // A list of events has been generated
- for (BeatsFrame frame : frames) {
- logger.debug("Received Beats frame with transaction {}
and command {}",
- new Object[]{frame.getSeqNumber(),
frame.getSeqNumber()});
- // Ignore the WINDOWS type frames as they contain no
payload.
- if (frame.getFrameType() != 0x57 ) {
- final SSLSocketChannelResponder responder = new
SSLSocketChannelResponder(socketChannel, sslSocketChannel);
- frameHandler.handle(frame, responder,
sender.toString());
- }
- }
- }
- }
-
- logger.debug("Done processing buffer");
-
- } catch (final BeatsFrameException rfe) {
- logger.error("Error reading Beats frames due to {}", new Object[]
{rfe.getMessage()} , rfe);
- // if an invalid frame or bad data was sent then the decoder will
be left in a
- // corrupted state, so lets close the connection and cause the
client to re-establish
- dispatcher.completeConnection(key);
- }
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsSocketChannelHandler.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsSocketChannelHandler.java
deleted file mode 100644
index bfe1e0b..0000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsSocketChannelHandler.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.handler;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import
org.apache.nifi.processor.util.listen.handler.socket.StandardSocketChannelHandler;
-import
org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
-import org.apache.nifi.processors.beats.frame.BeatsDecoder;
-import org.apache.nifi.processors.beats.frame.BeatsFrame;
-import org.apache.nifi.processors.beats.frame.BeatsFrameException;
-
-/**
- * Extends the StandardSocketChannelHandler to decode bytes into Beats frames.
- */
-public class BeatsSocketChannelHandler<E extends Event<SocketChannel>> extends
StandardSocketChannelHandler<E> {
-
- private BeatsDecoder decoder;
- private BeatsFrameHandler<E> frameHandler;
-
- public BeatsSocketChannelHandler(final SelectionKey key,
- final AsyncChannelDispatcher dispatcher,
- final Charset charset,
- final EventFactory<E> eventFactory,
- final BlockingQueue<E> events,
- final ComponentLog logger) {
- super(key, dispatcher, charset, eventFactory, events, logger);
- this.decoder = new BeatsDecoder(charset, logger);
- this.frameHandler = new BeatsFrameHandler<>(key, charset,
eventFactory, events, dispatcher, logger);
- }
-
- @Override
- protected void processBuffer(final SocketChannel socketChannel, final
ByteBuffer socketBuffer)
- throws InterruptedException, IOException {
-
- // get total bytes in buffer
- final int total = socketBuffer.remaining();
- final InetAddress sender = socketChannel.socket().getInetAddress();
-
- try {
- // go through the buffer parsing the packet command
- for (int i = 0; i < total; i++) {
- byte currByte = socketBuffer.get();
-
- // if we found the end of a frame, handle the frame and mark
the buffer
- if (decoder.process(currByte)) {
-
- final List<BeatsFrame> frames = decoder.getFrames();
-
- for (BeatsFrame frame : frames) {
- logger.debug("Received Beats frame with transaction {}
and command {}",
- new Object[]{frame.getSeqNumber(),
frame.getSeqNumber()});
- // Ignore the WINDOW SIZE type frames as they contain
no payload.
- if (frame.getFrameType() != 0x57) {
- final SocketChannelResponder responder = new
SocketChannelResponder(socketChannel);
- frameHandler.handle(frame, responder,
sender.toString());
- }
- }
- socketBuffer.mark();
- }
- }
- logger.debug("Done processing buffer");
-
- } catch (final BeatsFrameException rfe) {
- logger.error("Error reading Beats frames due to {}", new Object[]
{rfe.getMessage()}, rfe);
- // if an invalid frame or bad data was sent then the decoder will
be left in a
- // corrupted state, so lets close the connection and cause the
client to re-establish
- dispatcher.completeConnection(key);
- }
- }
-
- // not used for anything in Beats since the decoder encapsulates the
delimiter
- @Override
- public byte getDelimiter() {
- return BeatsFrame.DELIMITER;
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsSocketChannelHandlerFactory.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsSocketChannelHandlerFactory.java
deleted file mode 100644
index c0034f8..0000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/handler/BeatsSocketChannelHandlerFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.handler;
-
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.concurrent.BlockingQueue;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
-
-/**
- * Default factory for creating Beats socket channel handlers.
- */
-public class BeatsSocketChannelHandlerFactory<E extends Event<SocketChannel>>
implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
-
- @Override
- public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final
SelectionKey key,
- final AsyncChannelDispatcher
dispatcher,
- final Charset charset,
- final EventFactory<E> eventFactory,
- final BlockingQueue<E> events,
- final ComponentLog logger) {
- return new BeatsSocketChannelHandler<>(key, dispatcher, charset,
eventFactory, events, logger);
- }
-
- @Override
- public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final
SelectionKey key,
- final AsyncChannelDispatcher
dispatcher,
- final Charset charset,
- final EventFactory<E>
eventFactory,
- final BlockingQueue<E> events,
- final ComponentLog logger) {
- return new BeatsSSLSocketChannelHandler<>(key, dispatcher, charset,
eventFactory, events, logger);
- }
-
-}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
new file mode 100644
index 0000000..e870660
--- /dev/null
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsFrameDecoder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processors.beats.frame.BeatsDecoder;
+import org.apache.nifi.processors.beats.frame.BeatsFrame;
+import org.apache.nifi.processors.beats.frame.BeatsFrameType;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode a Beats message's bytes into a BeatsMessage object
+ */
+public class BeatsFrameDecoder extends ByteToMessageDecoder {
+
+ private final Charset charset;
+ private final ComponentLog logger;
+ private final BeatsMessageFactory messageFactory;
+
+ public BeatsFrameDecoder(final ComponentLog logger, final Charset charset)
{
+ this.charset = charset;
+ this.logger = logger;
+ this.messageFactory = new BeatsMessageFactory();
+ }
+
+ @Override
+ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in,
final List<Object> out) {
+ final int total = in.readableBytes();
+ final String senderSocket = ctx.channel().remoteAddress().toString();
+ final BeatsDecoder decoder = new BeatsDecoder(charset, logger);
+
+ for (int i = 0; i < total; i++) {
+ byte currByte = in.readByte();
+
+ // decode the bytes and once we find the end of a frame, handle
the frame
+ if (decoder.process(currByte)) {
+ final List<BeatsFrame> frames = decoder.getFrames();
+ for (BeatsFrame frame : frames) {
+ logger.debug("Received Beats Frame Sender [{}] Transaction
[{}] Frame Type [{}]",
+ senderSocket, frame.getSeqNumber(),
frame.getFrameType());
+ // Ignore the WINDOW SIZE type frames as they contain no
payload.
+ if (frame.getFrameType() != BeatsFrameType.WINDOWSIZE) {
+ handle(frame, senderSocket, out);
+ }
+ }
+ }
+ }
+ }
+
+ private void handle(final BeatsFrame frame, final String sender, final
List<Object> out) {
+ final Map<String, String> metadata =
EventFactoryUtil.createMapWithSender(sender);
+ metadata.put(BeatsMetadata.SEQNUMBER_KEY,
String.valueOf(frame.getSeqNumber()));
+
+ if (frame.getFrameType() == BeatsFrameType.JSON) {
+ final BeatsMessage event =
messageFactory.create(frame.getPayload(), metadata);
+ out.add(event);
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsEvent.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java
similarity index 64%
rename from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsEvent.java
rename to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java
index 032e91e..70f9d4d 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsEvent.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessage.java
@@ -14,27 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.event;
+package org.apache.nifi.processors.beats.netty;
-import org.apache.nifi.processor.util.listen.event.StandardEvent;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-
-import java.nio.channels.SocketChannel;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
/**
- * A Beat event which adds the transaction number to the StandardEvent.
+ * A Beats message which adds a sequence number to the ByteArrayMessage.
*/
-public class BeatsEvent extends StandardEvent<SocketChannel> {
+public class BeatsMessage extends ByteArrayMessage {
private final int seqNumber;
- public BeatsEvent(final String sender, final byte[] data, final
ChannelResponder<SocketChannel> responder, final int seqNumber) {
- super(sender, data, responder);
+ public BeatsMessage(final String sender, final byte[] data, final int
seqNumber) {
+ super(data, sender);
this.seqNumber = seqNumber;
}
public int getSeqNumber() {
return seqNumber;
}
-
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
new file mode 100644
index 0000000..0518a12
--- /dev/null
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageChannelHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.beats.frame.BeatsEncoder;
+import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
+import org.apache.nifi.processors.beats.response.BeatsResponse;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a BeatsMessage
+ */
[email protected]
+public class BeatsMessageChannelHandler extends
SimpleChannelInboundHandler<BeatsMessage> {
+
+ private final ComponentLog componentLog;
+ private final BlockingQueue<BeatsMessage> events;
+ private final BeatsEncoder encoder;
+
+ public BeatsMessageChannelHandler(BlockingQueue<BeatsMessage> events,
ComponentLog componentLog) {
+ this.events = events;
+ this.componentLog = componentLog;
+ this.encoder = new BeatsEncoder();
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, BeatsMessage msg) {
+ componentLog.debug("Beats Message Received Length [{}] Remote Address
[{}] ", msg.getMessage().length, msg.getSender());
+ if (events.offer(msg)) {
+ componentLog.debug("Event Queued: Beats Message Sender [{}]
Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
+ BeatsChannelResponse successResponse = new
BeatsChannelResponse(encoder, BeatsResponse.ok(msg.getSeqNumber()));
+
ctx.writeAndFlush(Unpooled.wrappedBuffer(successResponse.toByteArray()));
+ } else {
+ componentLog.warn("Beats Queue Full: Failed Beats Message Sender
[{}] Sequence Number [{}]", msg.getSender(), msg.getSeqNumber());
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsEventFactory.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
similarity index 56%
rename from
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsEventFactory.java
rename to
nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
index 09afa16..73be08f 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/event/BeatsEventFactory.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageFactory.java
@@ -14,24 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.processors.beats.event;
+package org.apache.nifi.processors.beats.netty;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.event.NetworkEventFactory;
+import org.apache.nifi.processors.beats.frame.BeatsMetadata;
import java.util.Map;
/**
- * An EventFactory implementation to create BeatEvents.
+ * An EventFactory implementation to create BeatsMessages.
*/
-public class BeatsEventFactory implements EventFactory<BeatsEvent> {
+public class BeatsMessageFactory implements NetworkEventFactory<BeatsMessage> {
@Override
- public BeatsEvent create(final byte[] data, final Map<String, String>
metadata, final ChannelResponder responder) {
- final String sender = metadata.get(EventFactory.SENDER_KEY);
- final int seqNumber =
Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
-
- return new BeatsEvent(sender, data, responder, seqNumber);
+ public BeatsMessage create(final byte[] data, final Map<String, String>
metadata) {
+ final int sequenceNumber =
Integer.valueOf(metadata.get(BeatsMetadata.SEQNUMBER_KEY));
+ final String sender = metadata.get(BeatsMetadata.SENDER_KEY);
+ return new BeatsMessage(sender, data, sequenceNumber);
}
-
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java
new file mode 100644
index 0000000..2e04141
--- /dev/null
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/netty/BeatsMessageServerFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.beats.netty;
+
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import
org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.net.InetAddress;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Netty Event Server Factory implementation for RELP Messages
+ */
+public class BeatsMessageServerFactory extends NettyEventServerFactory {
+
+ /**
+ * RELP Message Server Factory to receive RELP messages
+ * @param log Component Log
+ * @param address Server Address
+ * @param port Server Port Number
+ * @param charset Charset to use when decoding RELP messages
+ * @param events Blocking Queue for events received
+ */
+ public BeatsMessageServerFactory(final ComponentLog log,
+ final InetAddress address,
+ final int port,
+ final Charset charset,
+ final BlockingQueue<BeatsMessage> events)
{
+ super(address, port, TransportProtocol.TCP);
+ final BeatsMessageChannelHandler beatsChannelHandler = new
BeatsMessageChannelHandler(events, log);
+ final LogExceptionChannelHandler logExceptionChannelHandler = new
LogExceptionChannelHandler(log);
+
+ setHandlerSupplier(() -> Arrays.asList(
+ new BeatsFrameDecoder(log, charset),
+ beatsChannelHandler,
+ logExceptionChannelHandler
+ ));
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
index 8c51543..7f12c9d 100644
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
+++
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/response/BeatsResponse.java
@@ -59,5 +59,4 @@ public class BeatsResponse {
public static BeatsResponse ok(final int seqNumber) {
return new BeatsResponse(seqNumber);
}
-
}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/event/TestBeatsEventFactory.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/event/TestBeatsEventFactory.java
deleted file mode 100644
index b0fe9c7..0000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/event/TestBeatsEventFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.beats.event;
-
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import
org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
-import org.junit.Assert;
-import org.junit.Test;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TestBeatsEventFactory {
-
- @Test
- public void testCreateLumberJackEvent() {
- final String sender = "testsender1";
- final byte[] data = "this is a test line".getBytes();
- final int seqNumber = 1;
- final String fields = "{\"file\":\"test\"}";
-
-
- final Map<String,String> metadata = new HashMap<>();
- metadata.put(EventFactory.SENDER_KEY, sender);
- metadata.put(BeatsMetadata.SEQNUMBER_KEY, String.valueOf(seqNumber));
-
- final ChannelResponder responder = new SocketChannelResponder(null);
-
- final EventFactory<BeatsEvent> factory = new BeatsEventFactory();
-
- final BeatsEvent event = factory.create(data, metadata, responder);
-
- Assert.assertEquals(sender, event.getSender());
- Assert.assertEquals(seqNumber, event.getSeqNumber());
- Assert.assertEquals(data, event.getData());
- }
-}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsFrameHandler.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsFrameHandler.java
deleted file mode 100644
index ba0a9f2..0000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsFrameHandler.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.handler;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processor.util.listen.response.ChannelResponse;
-import org.apache.nifi.processors.beats.event.BeatsEvent;
-import org.apache.nifi.processors.beats.event.BeatsEventFactory;
-import org.apache.nifi.processors.beats.frame.BeatsEncoder;
-import org.apache.nifi.processors.beats.frame.BeatsFrame;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-
-public class TestBeatsFrameHandler {
- private Charset charset;
- private EventFactory<BeatsEvent> eventFactory;
- private BlockingQueue<BeatsEvent> events;
- private SelectionKey key;
- private AsyncChannelDispatcher dispatcher;
- private BeatsEncoder encoder;
-
- private ComponentLog logger;
-
- private BeatsFrameHandler<BeatsEvent> frameHandler;
-
- @Before
- public void setup() {
- this.charset = StandardCharsets.UTF_8;
- this.eventFactory = new BeatsEventFactory();
- this.events = new LinkedBlockingQueue<>();
- this.key = Mockito.mock(SelectionKey.class);
- this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class);
- this.logger = Mockito.mock(ComponentLog.class);
-
- this.frameHandler = new BeatsFrameHandler<>(key, charset,
eventFactory, events, dispatcher, logger);
- }
-
- @Test
- public void testWindow() throws IOException, InterruptedException {
- final BeatsFrame openFrame = new BeatsFrame.Builder()
- .version((byte) 0x31)
- .frameType((byte) 0x57)
- .seqNumber(-1)
- .payload(Integer.toString(1).getBytes())
- .build();
-
-
- final String sender = "sender1";
- final CapturingChannelResponder responder = new
CapturingChannelResponder();
-
- // call the handler and verify respond() was called once with once
response
- frameHandler.handle(openFrame, responder, sender);
-
- // No response expected
- Assert.assertEquals(0, responder.responded);
- }
-
- @Test
- public void testJson() throws IOException, InterruptedException {
- final byte jsonPayload[] = new byte[]{
- // Payload eq { "message": "test-content", "field": "value"}
- 0x7b, 0x22, 0x6d, 0x65,
- 0x73, 0x73, 0x61, 0x67,
- 0x65, 0x22, 0x3a, 0x20,
- 0x22, 0x74, 0x65, 0x73,
- 0x74, 0x2d, 0x63, 0x6f,
- 0x6e, 0x74, 0x65, 0x6e,
- 0x74, 0x22, 0x2c, 0x20,
- 0x22, 0x66, 0x69, 0x65,
- 0x6c, 0x64, 0x22, 0x3a,
- 0x20, 0x22, 0x76, 0x61,
- 0x6c, 0x75, 0x65, 0x22,
- 0x7d
- };
-
- final BeatsFrame jsonFrame = new BeatsFrame.Builder()
- .version((byte) 0x32)
- .frameType((byte) 0x4a)
- .seqNumber(1)
- .dataSize(45)
-
- .payload(jsonPayload)
- .build();
-
-
- final String sender = "sender1";
- final CapturingChannelResponder responder = new
CapturingChannelResponder();
-
- // call the handler and verify respond() was called once with once
response
- frameHandler.handle(jsonFrame, responder, sender);
-
- // No response expected
- Assert.assertEquals(0, responder.responded);
- // But events should contain one event
- Assert.assertEquals(1, events.size());
-
- final BeatsEvent event = events.poll();
- Assert.assertEquals("{\"message\": \"test-content\", \"field\":
\"value\"}", new String(event.getData(), charset));
- }
-
-
- private static class CapturingChannelResponder implements
ChannelResponder<SocketChannel> {
-
- int responded;
- List<ChannelResponse> responses = new ArrayList<>();
-
- @Override
- public SocketChannel getChannel() {
- return Mockito.mock(SocketChannel.class);
- }
-
- @Override
- public List<ChannelResponse> getResponses() {
- return responses;
- }
-
- @Override
- public void addResponse(ChannelResponse response) {
- responses.add(response);
- }
-
- @Override
- public void respond() throws IOException {
- responded++;
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
b/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
deleted file mode 100644
index 7c6c1cb..0000000
---
a/nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/test/java/org/apache/nifi/processors/beats/handler/TestBeatsSocketChannelHandler.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.beats.handler;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import javax.net.ssl.SSLContext;
-import javax.xml.bind.DatatypeConverter;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
-import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
-import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
-import
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processors.beats.event.BeatsMetadata;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-
-
-
-public class TestBeatsSocketChannelHandler {
- private EventFactory<TestEvent> eventFactory;
- private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher>
channelHandlerFactory;
- private ByteBufferSource byteBufferSource;
- private BlockingQueue<TestEvent> events;
- private ComponentLog logger = Mockito.mock(ComponentLog.class);
- private int maxConnections;
- private SSLContext sslContext;
- private Charset charset;
- private ChannelDispatcher dispatcher;
-
- @Before
- public void setup() {
- eventFactory = new TestEventHolderFactory();
- channelHandlerFactory = new BeatsSocketChannelHandlerFactory<>();
-
- byteBufferSource = new ByteBufferPool(1, 4096);
-
- events = new LinkedBlockingQueue<>();
- logger = Mockito.mock(ComponentLog.class);
-
- maxConnections = 1;
- sslContext = null;
- charset = StandardCharsets.UTF_8;
-
- dispatcher = new SocketChannelDispatcher<>(eventFactory,
channelHandlerFactory, byteBufferSource, events, logger,
- maxConnections, sslContext, charset);
-
- }
-
- @Test
- public void testWiredJsonHandling() throws IOException,
InterruptedException {
- final String singleJsonFrame =
"324a000000010000002d7b226d657373616765223a2022746573742d636f6e74656e7422" +
- "2c20226669656c64223a202276616c7565227d";
- final List<String> messages = new ArrayList<>();
- messages.add(singleJsonFrame);
-
- run(messages);
-
- // Check for the 1 frames (from the hex string above) are back...
- Assert.assertEquals(1, events.size());
-
- TestEvent event;
- while((event = events.poll()) != null) {
- Map<String, String> metadata = event.metadata;
-
Assert.assertTrue(metadata.containsKey(BeatsMetadata.SEQNUMBER_KEY));
-
- final String seqNum = metadata.get(BeatsMetadata.SEQNUMBER_KEY);
- final String line = new String(event.getData());
- Assert.assertTrue(seqNum.equals("1"));
- Assert.assertEquals("{\"message\": \"test-content\", \"field\":
\"value\"}", line);
- }
- }
-
- @Test
- public void testCompressedJsonHandling() throws IOException,
InterruptedException {
- final String multipleJsonFrame =
"3243000000E27801CC91414BC3401085477FCA3B6F" +
-
"93EEB6A5B8A71E3CF5ECC98BECC6491AC86643665290903FAE17A982540F8237E7F" +
-
"80D3C78EF734722BA21A2B71987C41A9E8306F819FA32303CBADCC020725078D46D" +
-
"C791836231D0EB7FDB0F933EE9354A2C129A4B44F8B8AF94197D4817CE7CCF67189" +
-
"CB2E80F74E651DADCC36357D8C2623138689B5834A4011E6E6DF7ABB55DAD770F76" +
-
"E3B7777EBB299CB58F30903C8D15C3A33CE5C465A8A74ACA2E3792A7B1E25259B4E" +
-
"87203835CD7C20ABF5FDC91886E89E8F58F237CEEF2EF1A5967BEFBFBD54F8C3162" +
- "790F0000FFFF6CB6A08D";
-
- final List<String> messages = new ArrayList<>();
- messages.add(multipleJsonFrame);
-
- run(messages);
-
- // Check for the 2 frames (from the hex string above) are back...
- Assert.assertEquals(2, events.size());
-
- boolean found1 = false;
- boolean found2 = false;
-
-
- TestEvent event;
- while((event = events.poll()) != null) {
- Map<String, String> metadata = event.metadata;
-
Assert.assertTrue(metadata.containsKey(BeatsMetadata.SEQNUMBER_KEY));
-
- final String seqNum = metadata.get(BeatsMetadata.SEQNUMBER_KEY);
- final String line = new String(event.getData());
-
- if (seqNum.equals("1") && line.contains("\"message\":\"aaaaaa\""))
{
- found1 = true;
- }
- if (seqNum.equals("2") && line.contains("\"message\":\"bbbb\"")) {
- found2 = true;
- }
- }
- Assert.assertTrue(found1 && found2);
- }
-
-
- protected void run(List<String> messages) throws IOException,
InterruptedException {
- final ByteBuffer buffer = ByteBuffer.allocate(1024);
- try {
- // starts the dispatcher listening on port 0 so it selects a
random port
- dispatcher.open(null, 0, 4096);
-
- // starts a thread to run the dispatcher which will accept/read
connections
- Thread dispatcherThread = new Thread(dispatcher);
- dispatcherThread.start();
-
-
- // create a client connection to the port the dispatcher is
listening on
- final int realPort = dispatcher.getPort();
- try (SocketChannel channel = SocketChannel.open()) {
- channel.connect(new InetSocketAddress("localhost", realPort));
- Thread.sleep(100);
-
- // send the provided messages
- for (int i=0; i < messages.size(); i++) {
- buffer.clear();
-
buffer.put(DatatypeConverter.parseHexBinary(messages.get(i)));
- buffer.flip();
-
- while (buffer.hasRemaining()) {
- channel.write(buffer);
- }
- Thread.sleep(1);
- }
- }
-
- // wait up to 10 seconds to verify the responses
- long timeout = 10000;
- long startTime = System.currentTimeMillis();
- while (events.size() < messages.size() &&
(System.currentTimeMillis() - startTime < timeout)) {
- Thread.sleep(100);
- }
-
- } finally {
- // stop the dispatcher thread and ensure we shut down handler
threads
- dispatcher.close();
- }
- }
-
- // Test event to produce from the data
- private static class TestEvent implements Event<SocketChannel> {
-
- private byte[] data;
- private Map<String, String> metadata;
-
- public TestEvent(byte[] data, Map<String, String> metadata) {
- this.data = data;
- this.metadata = metadata;
- }
-
- @Override
- public String getSender() {
- return metadata.get(EventFactory.SENDER_KEY);
- }
-
- @Override
- public byte[] getData() {
- return data;
- }
-
- @Override
- public ChannelResponder<SocketChannel> getResponder() {
- return null;
- }
- }
-
- // Factory to create test events and send responses for testing
- private static class TestEventHolderFactory implements
EventFactory<TestEvent> {
-
- @Override
- public TestEvent create(final byte[] data, final Map<String, String>
metadata, final ChannelResponder responder) {
- return new TestEvent(data, metadata);
- }
- }
-}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
index a15ef07..5ebbc40 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
@@ -25,7 +25,6 @@ import
org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
import org.apache.nifi.processor.ProcessContext;
-import
org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.standard.relp.event.RELPMessage;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
@@ -196,7 +195,7 @@ public class TestListenRELP {
MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
runner = TestRunners.newTestRunner(mockListenRELP);
- runner.setProperty(AbstractListenEventBatchingProcessor.PORT,
Integer.toString(NetworkUtils.availablePort()));
+ runner.setProperty(ListenerProperties.PORT,
Integer.toString(NetworkUtils.availablePort()));
runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
runner.run();
@@ -206,7 +205,7 @@ public class TestListenRELP {
private void run(final List<RELPFrame> frames, final int flowFiles, final
SSLContext sslContext) throws Exception {
final int port = NetworkUtils.availablePort();
- runner.setProperty(AbstractListenEventBatchingProcessor.PORT,
Integer.toString(port));
+ runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
// Run Processor and start Dispatcher without shutting down
runner.run(1, false, true);
final byte[] relpMessages = getRELPMessages(frames);