http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java new file mode 100644 index 0000000..5af316e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.handler; + +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.ChannelResponse; +import org.apache.nifi.processors.standard.relp.event.RELPMetadata; +import org.apache.nifi.processors.standard.relp.frame.RELPEncoder; +import org.apache.nifi.processors.standard.relp.frame.RELPFrame; +import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse; +import org.apache.nifi.processors.standard.relp.response.RELPResponse; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +/** + * Encapsulates the logic to handle a RELPFrame once it has been read from the channel. + */ +public class RELPFrameHandler<E extends Event<SocketChannel>> { + + static final String CMD_OPEN = "open"; + static final String CMD_CLOSE = "close"; + + private final Charset charset; + private final EventFactory<E> eventFactory; + private final BlockingQueue<E> events; + private final SelectionKey key; + private final AsyncChannelDispatcher dispatcher; + private final RELPEncoder encoder; + + public RELPFrameHandler(final SelectionKey selectionKey, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final AsyncChannelDispatcher dispatcher) { + this.key = selectionKey; + this.charset = charset; + this.eventFactory = eventFactory; + this.events = events; + this.dispatcher = dispatcher; + this.encoder = new RELPEncoder(charset); + } + + public void handle(final RELPFrame frame, final ChannelResponder<SocketChannel> responder, final String sender) + throws IOException, InterruptedException { + + // respond to open and close commands immediately, create and queue an event for everything else + if (CMD_OPEN.equals(frame.getCommand())) { + Map<String,String> offers = RELPResponse.parseOffers(frame.getData(), charset); + ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.open(frame.getTxnr(), offers)); + responder.addResponse(response); + responder.respond(); + } else if (CMD_CLOSE.equals(frame.getCommand())) { + ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.ok(frame.getTxnr())); + responder.addResponse(response); + responder.respond(); + dispatcher.completeConnection(key); + } else { + final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString()); + metadata.put(RELPMetadata.TXNR_KEY, String.valueOf(frame.getTxnr())); + metadata.put(RELPMetadata.COMMAND_KEY, frame.getCommand()); + + // queue the raw event blocking until space is available, reset the buffer + final E event = eventFactory.create(frame.getData(), metadata, responder); + events.put(event); + } + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java new file mode 100644 index 0000000..8bbd116 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.handler; + +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.socket.SSLSocketChannelHandler; +import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder; +import org.apache.nifi.processors.standard.relp.frame.RELPDecoder; +import org.apache.nifi.processors.standard.relp.frame.RELPFrame; +import org.apache.nifi.processors.standard.relp.frame.RELPFrameException; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.concurrent.BlockingQueue; + +/** + * A RELP implementation of SSLSocketChannelHandler. + */ +public class RELPSSLSocketChannelHandler<E extends Event<SocketChannel>> extends SSLSocketChannelHandler<E> { + + private RELPDecoder decoder; + private RELPFrameHandler<E> frameHandler; + + public RELPSSLSocketChannelHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger) { + super(key, dispatcher, charset, eventFactory, events, logger); + this.decoder = new RELPDecoder(charset); + this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher); + } + + @Override + protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel, + final int bytesRead, final byte[] buffer) throws InterruptedException, IOException { + + final InetAddress sender = socketChannel.socket().getInetAddress(); + try { + // go through the buffer parsing the RELP command + for (int i = 0; i < bytesRead; i++) { + byte currByte = buffer[i]; + + // if we found the end of a frame, handle the frame and mark the buffer + if (decoder.process(currByte)) { + final RELPFrame frame = decoder.getFrame(); + + logger.debug("Received RELP frame with transaction {} and command {}", + new Object[] {frame.getTxnr(), frame.getCommand()}); + + final SSLSocketChannelResponder responder = new SSLSocketChannelResponder(socketChannel, sslSocketChannel); + frameHandler.handle(frame, responder, sender.toString()); + } + } + + logger.debug("Done processing buffer"); + + } catch (final RELPFrameException rfe) { + logger.error("Error reading RELP frames due to {}", new Object[] {rfe.getMessage()} , rfe); + // if an invalid frame or bad data was sent then the decoder will be left in a + // corrupted state, so lets close the connection and cause the client to re-establish + dispatcher.completeConnection(key); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java new file mode 100644 index 0000000..0bb8185 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.handler; + +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.socket.StandardSocketChannelHandler; +import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder; +import org.apache.nifi.processors.standard.relp.frame.RELPDecoder; +import org.apache.nifi.processors.standard.relp.frame.RELPFrame; +import org.apache.nifi.processors.standard.relp.frame.RELPFrameException; + +import java.io.IOException; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.concurrent.BlockingQueue; + +/** + * Extends the StandardSocketChannelHandler to decode bytes into RELP frames. + */ +public class RELPSocketChannelHandler<E extends Event<SocketChannel>> extends StandardSocketChannelHandler<E> { + + private RELPDecoder decoder; + private RELPFrameHandler<E> frameHandler; + + public RELPSocketChannelHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger) { + super(key, dispatcher, charset, eventFactory, events, logger); + this.decoder = new RELPDecoder(charset); + this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher); + } + + @Override + protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer) + throws InterruptedException, IOException { + + // get total bytes in buffer + final int total = socketBuffer.remaining(); + final InetAddress sender = socketChannel.socket().getInetAddress(); + + try { + // go through the buffer parsing the RELP command + for (int i = 0; i < total; i++) { + byte currByte = socketBuffer.get(); + + // if we found the end of a frame, handle the frame and mark the buffer + if (decoder.process(currByte)) { + final RELPFrame frame = decoder.getFrame(); + + logger.debug("Received RELP frame with transaction {} and command {}", + new Object[] {frame.getTxnr(), frame.getCommand()}); + + final SocketChannelResponder responder = new SocketChannelResponder(socketChannel); + frameHandler.handle(frame, responder, sender.toString()); + socketBuffer.mark(); + } + } + + logger.debug("Done processing buffer"); + + } catch (final RELPFrameException rfe) { + logger.error("Error reading RELP frames due to {}", new Object[] {rfe.getMessage()}, rfe); + // if an invalid frame or bad data was sent then the decoder will be left in a + // corrupted state, so lets close the connection and cause the client to re-establish + dispatcher.completeConnection(key); + } + } + + // not used for anything in RELP since the decoder encapsulates the delimiter + @Override + public byte getDelimiter() { + return RELPFrame.DELIMITER; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandlerFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandlerFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandlerFactory.java new file mode 100644 index 0000000..2299a6b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandlerFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.handler; + +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.Event; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandler; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; + +import java.nio.channels.SelectionKey; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.util.concurrent.BlockingQueue; + +/** + * Default factory for creating RELP socket channel handlers. + */ +public class RELPSocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> { + + @Override + public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger) { + return new RELPSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); + } + + @Override + public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final SelectionKey key, + final AsyncChannelDispatcher dispatcher, + final Charset charset, + final EventFactory<E> eventFactory, + final BlockingQueue<E> events, + final ProcessorLog logger) { + return new RELPSSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPChannelResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPChannelResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPChannelResponse.java new file mode 100644 index 0000000..ace73cf --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPChannelResponse.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.response; + +import org.apache.nifi.processor.util.listen.response.ChannelResponse; +import org.apache.nifi.processors.standard.relp.frame.RELPFrame; +import org.apache.nifi.processors.standard.relp.frame.RELPEncoder; + +/** + * Creates a RELPFrame for the provided response and returns the encoded frame. + */ +public class RELPChannelResponse implements ChannelResponse { + + private final RELPEncoder encoder; + private final RELPResponse response; + + public RELPChannelResponse(final RELPEncoder encoder, final RELPResponse response) { + this.encoder = encoder; + this.response = response; + } + + @Override + public byte[] toByteArray() { + final RELPFrame frame = response.toFrame(encoder.getCharset()); + return encoder.encode(frame); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java new file mode 100644 index 0000000..f543f26 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.response; + +import org.apache.nifi.processors.standard.relp.frame.RELPFrame; + +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; + +/** + * The data portion of a RELPFrame for a response: + * + * RSP-CODE [SP HUMANMSG] LF [CMDDATA] + * + */ +public class RELPResponse { + + public static final int OK = 200; + public static final int ERROR = 500; + + public static final String RSP_CMD = "rsp"; + + private final long txnr; + private final int code; + private final String message; + private final String data; + + public RELPResponse(final long txnr, final int code) { + this(txnr, code, null, null); + } + + public RELPResponse(final long txnr, final int code, final String message, final String data) { + this.txnr = txnr; + this.code = code; + this.message = message; + this.data = data; + } + + /** + * Creates a RELPFrame where the data portion will contain this response. + * + * @param charset the character set to encode the response + * + * @return a RELPFrame for for this response + */ + public RELPFrame toFrame(final Charset charset) { + final StringBuilder builder = new StringBuilder(); + builder.append(code); + + if (message != null && !message.isEmpty()) { + builder.append((char)RELPFrame.SEPARATOR); + builder.append(message); + } + + if (data != null) { + builder.append((char)RELPFrame.DELIMITER); + builder.append(data); + } + + final byte[] data = builder.toString().getBytes(charset); + + return new RELPFrame.Builder() + .txnr(txnr).command(RSP_CMD) + .dataLength(data.length).data(data) + .build(); + } + + /** + * Utility method to create a response to an open request. + * + * @param txnr the transaction number of the open request + * @param offers the accepted offers + * + * @return the RELPResponse for the given open request + */ + public static RELPResponse open(final long txnr, final Map<String,String> offers) { + int i = 0; + final StringBuilder sb = new StringBuilder(); + for (final Map.Entry<String, String> entry : offers.entrySet()) { + if (i > 0) { + sb.append((char)RELPFrame.DELIMITER); + } + + sb.append(entry.getKey()); + + if (entry.getValue() != null) { + sb.append('='); + sb.append(entry.getValue()); + } + i++; + } + + return new RELPResponse(txnr, OK, "OK", sb.toString()); + } + + /** + * Utility method to create a default "OK" response. + * + * @param txnr the transaction number being responded to + * + * @return a RELPResponse with a 200 code and a message of "OK" + */ + public static RELPResponse ok(final long txnr) { + return new RELPResponse(txnr, OK, "OK", null); + } + + /** + * Utility method to create a default "ERROR" response. + * + * @param txnr the transaction number being responded to + * + * @return a RELPResponse with a 500 code and a message of "ERROR" + */ + public static RELPResponse error(final long txnr) { + return new RELPResponse(txnr, ERROR, "ERROR", null); + } + + + /** + * Parses the provided data into a Map of offers. + * + * @param data the data portion of a RELPFrame for an "open" command + * @param charset the charset to decode the data + * + * @return a Map of offers, or an empty Map if no data is provided + */ + public static Map<String,String> parseOffers(final byte[] data, final Charset charset) { + final Map<String, String> offers = new HashMap<>(); + if (data == null || data.length == 0) { + return offers; + } + + final String dataStr = new String(data, charset); + final String[] splits = dataStr.split("[" + (char)RELPFrame.DELIMITER + "]"); + + for (final String split : splits) { + final String[] fields = split.split( "=", 2); + if (fields.length > 1 ) { + offers.put(fields[0], fields[1]); + } else { + offers.put(fields[0], fields[0]); + } + } + + return offers; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogAttributes.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogAttributes.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogAttributes.java new file mode 100644 index 0000000..e7a13f1 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogAttributes.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.syslog; + +import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; + +/** + * FlowFile Attributes for each Syslog message. + */ +public enum SyslogAttributes implements FlowFileAttributeKey { + + PRIORITY("syslog.priority"), + SEVERITY("syslog.severity"), + FACILITY("syslog.facility"), + VERSION("syslog.version"), + TIMESTAMP("syslog.timestamp"), + HOSTNAME("syslog.hostname"), + SENDER("syslog.sender"), + BODY("syslog.body"), + VALID("syslog.valid"), + PROTOCOL("syslog.protocol"), + PORT("syslog.port"); + + private String key; + + SyslogAttributes(String key) { + this.key = key; + } + + @Override + public String key() { + return key; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogEvent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogEvent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogEvent.java new file mode 100644 index 0000000..29243cd --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogEvent.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.syslog; + +/** + * Encapsulates the parsed information for a single Syslog event. + */ +public class SyslogEvent { + + private final String priority; + private final String severity; + private final String facility; + private final String version; + private final String timeStamp; + private final String hostName; + private final String sender; + private final String msgBody; + private final String fullMessage; + private final byte[] rawMessage; + private final boolean valid; + + private SyslogEvent(final Builder builder) { + this.priority = builder.priority; + this.severity = builder.severity; + this.facility = builder.facility; + this.version = builder.version; + this.timeStamp = builder.timeStamp; + this.hostName = builder.hostName; + this.sender = builder.sender; + this.msgBody = builder.msgBody; + this.fullMessage = builder.fullMessage; + this.rawMessage = builder.rawMessage; + this.valid = builder.valid; + } + + public String getPriority() { + return priority; + } + + public String getSeverity() { + return severity; + } + + public String getFacility() { + return facility; + } + + public String getVersion() { + return version; + } + + public String getTimeStamp() { + return timeStamp; + } + + public String getHostName() { + return hostName; + } + + public String getSender() { + return sender; + } + + public String getMsgBody() { + return msgBody; + } + + public String getFullMessage() { + return fullMessage; + } + + public byte[] getRawMessage() { + return rawMessage; + } + + public boolean isValid() { + return valid; + } + + public static final class Builder { + private String priority; + private String severity; + private String facility; + private String version; + private String timeStamp; + private String hostName; + private String sender; + private String msgBody; + private String fullMessage; + private byte[] rawMessage; + private boolean valid; + + public void reset() { + this.priority = null; + this.severity = null; + this.facility = null; + this.version = null; + this.timeStamp = null; + this.hostName = null; + this.sender = null; + this.msgBody = null; + this.fullMessage = null; + this.valid = false; + } + + public Builder priority(String priority) { + this.priority = priority; + return this; + } + + public Builder severity(String severity) { + this.severity = severity; + return this; + } + + public Builder facility(String facility) { + this.facility = facility; + return this; + } + + public Builder version(String version) { + this.version = version; + return this; + } + + public Builder timestamp(String timestamp) { + this.timeStamp = timestamp; + return this; + } + + public Builder hostname(String hostName) { + this.hostName = hostName; + return this; + } + + public Builder sender(String sender) { + this.sender = sender; + return this; + } + + public Builder msgBody(String msgBody) { + this.msgBody = msgBody; + return this; + } + + public Builder fullMessage(String fullMessage) { + this.fullMessage = fullMessage; + return this; + } + + public Builder rawMessage(byte[] rawMessage) { + this.rawMessage = rawMessage; + return this; + } + + public Builder valid(boolean valid) { + this.valid = valid; + return this; + } + + public SyslogEvent build() { + return new SyslogEvent(this); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java new file mode 100644 index 0000000..b5dbf22 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/syslog/SyslogParser.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.syslog; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.regex.MatchResult; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Parses a Syslog message from a ByteBuffer into a SyslogEvent instance. + * + * The Syslog regular expressions below were adapted from the Apache Flume project. + */ +public class SyslogParser { + + public static final String SYSLOG_MSG_RFC5424_0 = + "(?:\\<(\\d{1,3})\\>)" + // priority + "(?:(\\d)?\\s?)" + // version + /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */ + "(?:" + + "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" + + "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp + "\\s" + // separator + "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null) + "\\s" + // separator + "(.*)$"; // body + + public static final String SYSLOG_MSG_RFC3164_0 = + "(?:\\<(\\d{1,3})\\>)" + + "(?:(\\d)?\\s?)" + // version + // stamp MMM d HH:mm:ss, single digit date has two spaces + "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" + + "\\s" + // separator + "([\\w][\\w\\d\\.@-]*)" + // host + "\\s(.*)$"; // body + + public static final Collection<Pattern> MESSAGE_PATTERNS; + static { + List<Pattern> patterns = new ArrayList<>(); + patterns.add(Pattern.compile(SYSLOG_MSG_RFC5424_0)); + patterns.add(Pattern.compile(SYSLOG_MSG_RFC3164_0)); + MESSAGE_PATTERNS = Collections.unmodifiableList(patterns); + } + + // capture group positions from the above message patterns + public static final int SYSLOG_PRIORITY_POS = 1; + public static final int SYSLOG_VERSION_POS = 2; + public static final int SYSLOG_TIMESTAMP_POS = 3; + public static final int SYSLOG_HOSTNAME_POS = 4; + public static final int SYSLOG_BODY_POS = 5; + + private Charset charset; + + public SyslogParser(final Charset charset) { + this.charset = charset; + } + + /** + * Parses a SyslogEvent from a byte buffer. + * + * @param buffer a byte buffer containing a syslog message + * @return a SyslogEvent parsed from the byte array + */ + public SyslogEvent parseEvent(final ByteBuffer buffer) { + return parseEvent(buffer, null); + } + + /** + * Parses a SyslogEvent from a byte buffer. + * + * @param buffer a byte buffer containing a syslog message + * @param sender the hostname of the syslog server that sent the message + * @return a SyslogEvent parsed from the byte array + */ + public SyslogEvent parseEvent(final ByteBuffer buffer, final String sender) { + if (buffer == null) { + return null; + } + if (buffer.position() != 0) { + buffer.flip(); + } + byte bytes[] = new byte[buffer.limit()]; + buffer.get(bytes, 0, buffer.limit()); + return parseEvent(bytes, sender); + } + + /** + * Parses a SyslogEvent from a byte array. + * + * @param bytes a byte array containing a syslog message + * @param sender the hostname of the syslog server that sent the message + * @return a SyslogEvent parsed from the byte array + */ + public SyslogEvent parseEvent(final byte[] bytes, final String sender) { + if (bytes == null || bytes.length == 0) { + return null; + } + + // remove trailing new line before parsing + int length = bytes.length; + if (bytes[length - 1] == '\n') { + length = length - 1; + } + + final String message = new String(bytes, 0, length, charset); + + final SyslogEvent.Builder builder = new SyslogEvent.Builder() + .valid(false).fullMessage(message).rawMessage(bytes).sender(sender); + + for (Pattern pattern : MESSAGE_PATTERNS) { + final Matcher matcher = pattern.matcher(message); + if (!matcher.matches()) { + continue; + } + + final MatchResult res = matcher.toMatchResult(); + for (int grp = 1; grp <= res.groupCount(); grp++) { + String value = res.group(grp); + if (grp == SYSLOG_TIMESTAMP_POS) { + builder.timestamp(value); + } else if (grp == SYSLOG_HOSTNAME_POS) { + builder.hostname(value); + } else if (grp == SYSLOG_PRIORITY_POS) { + int pri = Integer.parseInt(value); + int sev = pri % 8; + int facility = pri / 8; + builder.priority(value); + builder.severity(String.valueOf(sev)); + builder.facility(String.valueOf(facility)); + } else if (grp == SYSLOG_VERSION_POS) { + builder.version(value); + } else if (grp == SYSLOG_BODY_POS) { + builder.msgBody(value); + } + } + + builder.valid(true); + break; + } + + // either invalid w/original msg, or fully parsed event + return builder.build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java deleted file mode 100644 index 3d06dbe..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogEvent.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -/** - * Encapsulates the parsed information for a single Syslog event. - */ -public class SyslogEvent { - - private final String priority; - private final String severity; - private final String facility; - private final String version; - private final String timeStamp; - private final String hostName; - private final String sender; - private final String msgBody; - private final String fullMessage; - private final byte[] rawMessage; - private final boolean valid; - - private SyslogEvent(final Builder builder) { - this.priority = builder.priority; - this.severity = builder.severity; - this.facility = builder.facility; - this.version = builder.version; - this.timeStamp = builder.timeStamp; - this.hostName = builder.hostName; - this.sender = builder.sender; - this.msgBody = builder.msgBody; - this.fullMessage = builder.fullMessage; - this.rawMessage = builder.rawMessage; - this.valid = builder.valid; - } - - public String getPriority() { - return priority; - } - - public String getSeverity() { - return severity; - } - - public String getFacility() { - return facility; - } - - public String getVersion() { - return version; - } - - public String getTimeStamp() { - return timeStamp; - } - - public String getHostName() { - return hostName; - } - - public String getSender() { - return sender; - } - - public String getMsgBody() { - return msgBody; - } - - public String getFullMessage() { - return fullMessage; - } - - public byte[] getRawMessage() { - return rawMessage; - } - - public boolean isValid() { - return valid; - } - - public static final class Builder { - private String priority; - private String severity; - private String facility; - private String version; - private String timeStamp; - private String hostName; - private String sender; - private String msgBody; - private String fullMessage; - private byte[] rawMessage; - private boolean valid; - - public void reset() { - this.priority = null; - this.severity = null; - this.facility = null; - this.version = null; - this.timeStamp = null; - this.hostName = null; - this.sender = null; - this.msgBody = null; - this.fullMessage = null; - this.valid = false; - } - - public Builder priority(String priority) { - this.priority = priority; - return this; - } - - public Builder severity(String severity) { - this.severity = severity; - return this; - } - - public Builder facility(String facility) { - this.facility = facility; - return this; - } - - public Builder version(String version) { - this.version = version; - return this; - } - - public Builder timestamp(String timestamp) { - this.timeStamp = timestamp; - return this; - } - - public Builder hostname(String hostName) { - this.hostName = hostName; - return this; - } - - public Builder sender(String sender) { - this.sender = sender; - return this; - } - - public Builder msgBody(String msgBody) { - this.msgBody = msgBody; - return this; - } - - public Builder fullMessage(String fullMessage) { - this.fullMessage = fullMessage; - return this; - } - - public Builder rawMessage(byte[] rawMessage) { - this.rawMessage = rawMessage; - return this; - } - - public Builder valid(boolean valid) { - this.valid = valid; - return this; - } - - public SyslogEvent build() { - return new SyslogEvent(this); - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java deleted file mode 100644 index fd59e5b..0000000 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SyslogParser.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.standard.util; - -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.regex.MatchResult; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Parses a Syslog message from a ByteBuffer into a SyslogEvent instance. - * - * The Syslog regular expressions below were adapted from the Apache Flume project. - */ -public class SyslogParser { - - public static final String SYSLOG_MSG_RFC5424_0 = - "(?:\\<(\\d{1,3})\\>)" + // priority - "(?:(\\d)?\\s?)" + // version - /* yyyy-MM-dd'T'HH:mm:ss.SZ or yyyy-MM-dd'T'HH:mm:ss.S+hh:mm or - (null stamp) */ - "(?:" + - "(\\d{4}[-]\\d{2}[-]\\d{2}[T]\\d{2}[:]\\d{2}[:]\\d{2}" + - "(?:\\.\\d{1,6})?(?:[+-]\\d{2}[:]\\d{2}|Z)?)|-)" + // stamp - "\\s" + // separator - "(?:([\\w][\\w\\d\\.@\\-]*)|-)" + // host name or - (null) - "\\s" + // separator - "(.*)$"; // body - - public static final String SYSLOG_MSG_RFC3164_0 = - "(?:\\<(\\d{1,3})\\>)" + - "(?:(\\d)?\\s?)" + // version - // stamp MMM d HH:mm:ss, single digit date has two spaces - "([A-Z][a-z][a-z]\\s{1,2}\\d{1,2}\\s\\d{2}[:]\\d{2}[:]\\d{2})" + - "\\s" + // separator - "([\\w][\\w\\d\\.@-]*)" + // host - "\\s(.*)$"; // body - - public static final Collection<Pattern> MESSAGE_PATTERNS; - static { - List<Pattern> patterns = new ArrayList<>(); - patterns.add(Pattern.compile(SYSLOG_MSG_RFC5424_0)); - patterns.add(Pattern.compile(SYSLOG_MSG_RFC3164_0)); - MESSAGE_PATTERNS = Collections.unmodifiableList(patterns); - } - - // capture group positions from the above message patterns - public static final int SYSLOG_PRIORITY_POS = 1; - public static final int SYSLOG_VERSION_POS = 2; - public static final int SYSLOG_TIMESTAMP_POS = 3; - public static final int SYSLOG_HOSTNAME_POS = 4; - public static final int SYSLOG_BODY_POS = 5; - - private Charset charset; - - public SyslogParser(final Charset charset) { - this.charset = charset; - } - - /** - * Parses a SyslogEvent from a byte buffer. - * - * @param buffer a byte buffer containing a syslog message - * @return a SyslogEvent parsed from the byte array - */ - public SyslogEvent parseEvent(final ByteBuffer buffer) { - return parseEvent(buffer, null); - } - - /** - * Parses a SyslogEvent from a byte buffer. - * - * @param buffer a byte buffer containing a syslog message - * @param sender the hostname of the syslog server that sent the message - * @return a SyslogEvent parsed from the byte array - */ - public SyslogEvent parseEvent(final ByteBuffer buffer, final String sender) { - if (buffer == null) { - return null; - } - if (buffer.position() != 0) { - buffer.flip(); - } - byte bytes[] = new byte[buffer.limit()]; - buffer.get(bytes, 0, buffer.limit()); - return parseEvent(bytes, sender); - } - - /** - * Parses a SyslogEvent from a byte array. - * - * @param bytes a byte array containing a syslog message - * @param sender the hostname of the syslog server that sent the message - * @return a SyslogEvent parsed from the byte array - */ - public SyslogEvent parseEvent(final byte[] bytes, final String sender) { - if (bytes == null || bytes.length == 0) { - return null; - } - - // remove trailing new line before parsing - int length = bytes.length; - if (bytes[length - 1] == '\n') { - length = length - 1; - } - - final String message = new String(bytes, 0, length, charset); - - final SyslogEvent.Builder builder = new SyslogEvent.Builder() - .valid(false).fullMessage(message).rawMessage(bytes).sender(sender); - - for (Pattern pattern : MESSAGE_PATTERNS) { - final Matcher matcher = pattern.matcher(message); - if (!matcher.matches()) { - continue; - } - - final MatchResult res = matcher.toMatchResult(); - for (int grp = 1; grp <= res.groupCount(); grp++) { - String value = res.group(grp); - if (grp == SYSLOG_TIMESTAMP_POS) { - builder.timestamp(value); - } else if (grp == SYSLOG_HOSTNAME_POS) { - builder.hostname(value); - } else if (grp == SYSLOG_PRIORITY_POS) { - int pri = Integer.parseInt(value); - int sev = pri % 8; - int facility = pri / 8; - builder.priority(value); - builder.severity(String.valueOf(sev)); - builder.facility(String.valueOf(facility)); - } else if (grp == SYSLOG_VERSION_POS) { - builder.version(value); - } else if (grp == SYSLOG_BODY_POS) { - builder.msgBody(value); - } - } - - builder.valid(true); - break; - } - - // either invalid w/original msg, or fully parsed event - return builder.build(); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index befa5e7..66f1c27 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -46,6 +46,7 @@ org.apache.nifi.processors.standard.GetJMSQueue org.apache.nifi.processors.standard.GetJMSTopic org.apache.nifi.processors.standard.ListFile org.apache.nifi.processors.standard.ListenHTTP +org.apache.nifi.processors.standard.ListenRELP org.apache.nifi.processors.standard.ListenSyslog org.apache.nifi.processors.standard.ListenUDP org.apache.nifi.processors.standard.ListSFTP http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java new file mode 100644 index 0000000..877a55a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processors.standard.relp.event.RELPEvent; +import org.apache.nifi.processors.standard.relp.frame.RELPEncoder; +import org.apache.nifi.processors.standard.relp.frame.RELPFrame; +import org.apache.nifi.processors.standard.relp.response.RELPResponse; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.StandardSSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +public class TestListenRELP { + + public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog"; + public static final String SYSLOG_FRAME_DATA = "this is a syslog message here"; + + static final RELPFrame OPEN_FRAME = new RELPFrame.Builder() + .txnr(1) + .command("open") + .dataLength(OPEN_FRAME_DATA.length()) + .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8)) + .build(); + + static final RELPFrame SYSLOG_FRAME = new RELPFrame.Builder() + .txnr(2) + .command("syslog") + .dataLength(SYSLOG_FRAME_DATA.length()) + .data(SYSLOG_FRAME_DATA.getBytes(StandardCharsets.UTF_8)) + .build(); + + static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder() + .txnr(3) + .command("close") + .dataLength(0) + .data(new byte[0]) + .build(); + + private RELPEncoder encoder; + private ResponseCapturingListenRELP proc; + private TestRunner runner; + + @Before + public void setup() { + encoder = new RELPEncoder(StandardCharsets.UTF_8); + proc = new ResponseCapturingListenRELP(); + runner = TestRunners.newTestRunner(proc); + runner.setProperty(ListenSyslog.PORT, "0"); + } + + @Test + public void testListenRELP() throws IOException, InterruptedException { + final List<RELPFrame> frames = new ArrayList<>(); + frames.add(OPEN_FRAME); + frames.add(SYSLOG_FRAME); + frames.add(SYSLOG_FRAME); + frames.add(SYSLOG_FRAME); + frames.add(CLOSE_FRAME); + + // three syslog frames should be transferred and three responses should be sent + run(frames, 3, 3, null); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(3, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp")); + + final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS); + Assert.assertEquals(3, mockFlowFiles.size()); + + final MockFlowFile mockFlowFile = mockFlowFiles.get(0); + Assert.assertEquals(String.valueOf(SYSLOG_FRAME.getTxnr()), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key())); + Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key())); + Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key()))); + Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key()))); + } + + @Test + public void testBatching() throws IOException, InterruptedException { + runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "5"); + + final List<RELPFrame> frames = new ArrayList<>(); + frames.add(OPEN_FRAME); + frames.add(SYSLOG_FRAME); + frames.add(SYSLOG_FRAME); + frames.add(SYSLOG_FRAME); + frames.add(CLOSE_FRAME); + + // one syslog frame should be transferred since we are batching, but three responses should be sent + run(frames, 1, 3, null); + + final List<ProvenanceEventRecord> events = runner.getProvenanceEvents(); + Assert.assertNotNull(events); + Assert.assertEquals(1, events.size()); + + final ProvenanceEventRecord event = events.get(0); + Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType()); + Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp")); + + final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS); + Assert.assertEquals(1, mockFlowFiles.size()); + + final MockFlowFile mockFlowFile = mockFlowFiles.get(0); + Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key())); + Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key()))); + Assert.assertTrue(!StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key()))); + } + + @Test + public void testTLS() throws InitializationException, IOException, InterruptedException { + final SSLContextService sslContextService = new StandardSSLContextService(); + runner.addControllerService("ssl-context", sslContextService); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE, "src/test/resources/localhost-ts.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.TRUSTSTORE_TYPE, "JKS"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE, "src/test/resources/localhost-ks.jks"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_PASSWORD, "localtest"); + runner.setProperty(sslContextService, StandardSSLContextService.KEYSTORE_TYPE, "JKS"); + runner.enableControllerService(sslContextService); + + runner.setProperty(PostHTTP.SSL_CONTEXT_SERVICE, "ssl-context"); + + final List<RELPFrame> frames = new ArrayList<>(); + frames.add(OPEN_FRAME); + frames.add(SYSLOG_FRAME); + frames.add(SYSLOG_FRAME); + frames.add(SYSLOG_FRAME); + frames.add(SYSLOG_FRAME); + frames.add(SYSLOG_FRAME); + frames.add(CLOSE_FRAME); + + // three syslog frames should be transferred and three responses should be sent + run(frames, 5, 5, sslContextService); + } + + protected void run(final List<RELPFrame> frames, final int expectedTransferred, final int expectedResponses, final SSLContextService sslContextService) + throws IOException, InterruptedException { + + Socket socket = null; + try { + // schedule to start listening on a random port + final ProcessSessionFactory processSessionFactory = runner.getProcessSessionFactory(); + final ProcessContext context = runner.getProcessContext(); + proc.onScheduled(context); + + // create a client connection to the port the dispatcher is listening on + final int realPort = proc.getDispatcherPort(); + + // create either a regular socket or ssl socket based on context being passed in + if (sslContextService != null) { + final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); + socket = sslContext.getSocketFactory().createSocket("localhost", realPort); + } else { + socket = new Socket("localhost", realPort); + } + Thread.sleep(100); + + // send the frames to the port the processors is listening on + sendFrames(frames, socket); + + // call onTrigger until we processed all the frames, or a certain amount of time passes + long responseTimeout = 10000; + long startTime = System.currentTimeMillis(); + while (proc.responses.size() < expectedTransferred + && (System.currentTimeMillis() - startTime < responseTimeout)) { + proc.onTrigger(context, processSessionFactory); + Thread.sleep(100); + } + + // should have gotten a response for each frame + Assert.assertEquals(expectedResponses, proc.responses.size()); + + // should have transferred the expected events + runner.assertTransferCount(ListenRELP.REL_SUCCESS, expectedTransferred); + + } finally { + // unschedule to close connections + proc.onUnscheduled(); + IOUtils.closeQuietly(socket); + } + } + + private void sendFrames(final List<RELPFrame> frames, final Socket socket) throws IOException, InterruptedException { + // send the provided messages + for (final RELPFrame frame : frames) { + byte[] encodedFrame = encoder.encode(frame); + socket.getOutputStream().write(encodedFrame); + Thread.sleep(1); + } + socket.getOutputStream().flush(); + } + + // Extend ListenRELP so we can use the CapturingSocketChannelResponseDispatcher + private static class ResponseCapturingListenRELP extends ListenRELP { + + private List<RELPResponse> responses = new ArrayList<>(); + + @Override + protected void respond(RELPEvent event, RELPResponse relpResponse) { + this.responses.add(relpResponse); + super.respond(event, relpResponse); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java index 5fb26c8..360dfe7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java @@ -16,19 +16,6 @@ */ package org.apache.nifi.processors.standard; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.Validator; @@ -36,9 +23,9 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.standard.ListenSyslog.RawSyslogEvent; -import org.apache.nifi.processors.standard.util.SyslogEvent; -import org.apache.nifi.processors.standard.util.SyslogParser; +import org.apache.nifi.processors.standard.syslog.SyslogAttributes; +import org.apache.nifi.processors.standard.syslog.SyslogEvent; +import org.apache.nifi.processors.standard.syslog.SyslogParser; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.IntegerHolder; @@ -50,6 +37,19 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.junit.Assert.assertEquals; + public class TestListenSyslog { static final Logger LOGGER = LoggerFactory.getLogger(TestListenSyslog.class); @@ -301,9 +301,9 @@ public class TestListenSyslog { runner.assertAllFlowFilesTransferred(ListenSyslog.REL_SUCCESS, 1); final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ListenSyslog.REL_SUCCESS).get(0); - Assert.assertEquals("0", flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key())); - Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue(), flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key())); - Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()))); + Assert.assertEquals("0", flowFile.getAttribute(SyslogAttributes.PORT.key())); + Assert.assertEquals(ListenSyslog.UDP_VALUE.getValue(), flowFile.getAttribute(SyslogAttributes.PROTOCOL.key())); + Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key()))); final String content = new String(flowFile.toByteArray(), StandardCharsets.UTF_8); final String[] splits = content.split("\\|"); @@ -391,16 +391,16 @@ public class TestListenSyslog { @Test public void testErrorQueue() throws IOException { - final List<RawSyslogEvent> msgs = new ArrayList<>(); - msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01")); - msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01")); + final List<ListenSyslog.RawSyslogEvent> msgs = new ArrayList<>(); + msgs.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01")); + msgs.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01")); // Add message that will throw a FlowFileAccessException the first time that we attempt to read - // the contents but will succeeed the second time. + // the contents but will succeed the second time. final IntegerHolder getMessageAttempts = new IntegerHolder(0); - msgs.add(new RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01") { + msgs.add(new ListenSyslog.RawSyslogEvent(VALID_MESSAGE.getBytes(), "sender-01") { @Override - public byte[] getRawMessage() { + public byte[] getData() { final int attempts = getMessageAttempts.incrementAndGet(); if (attempts == 1) { throw new FlowFileAccessException("Unit test failure"); @@ -432,16 +432,16 @@ public class TestListenSyslog { private void checkFlowFile(final MockFlowFile flowFile, final int port, final String protocol) { flowFile.assertContentEquals(VALID_MESSAGE); - Assert.assertEquals(PRI, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PRIORITY.key())); - Assert.assertEquals(SEV, flowFile.getAttribute(ListenSyslog.SyslogAttributes.SEVERITY.key())); - Assert.assertEquals(FAC, flowFile.getAttribute(ListenSyslog.SyslogAttributes.FACILITY.key())); - Assert.assertEquals(TIME, flowFile.getAttribute(ListenSyslog.SyslogAttributes.TIMESTAMP.key())); - Assert.assertEquals(HOST, flowFile.getAttribute(ListenSyslog.SyslogAttributes.HOSTNAME.key())); - Assert.assertEquals(BODY, flowFile.getAttribute(ListenSyslog.SyslogAttributes.BODY.key())); - Assert.assertEquals("true", flowFile.getAttribute(ListenSyslog.SyslogAttributes.VALID.key())); - Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(ListenSyslog.SyslogAttributes.PORT.key())); - Assert.assertEquals(protocol, flowFile.getAttribute(ListenSyslog.SyslogAttributes.PROTOCOL.key())); - Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(ListenSyslog.SyslogAttributes.SENDER.key()))); + Assert.assertEquals(PRI, flowFile.getAttribute(SyslogAttributes.PRIORITY.key())); + Assert.assertEquals(SEV, flowFile.getAttribute(SyslogAttributes.SEVERITY.key())); + Assert.assertEquals(FAC, flowFile.getAttribute(SyslogAttributes.FACILITY.key())); + Assert.assertEquals(TIME, flowFile.getAttribute(SyslogAttributes.TIMESTAMP.key())); + Assert.assertEquals(HOST, flowFile.getAttribute(SyslogAttributes.HOSTNAME.key())); + Assert.assertEquals(BODY, flowFile.getAttribute(SyslogAttributes.BODY.key())); + Assert.assertEquals("true", flowFile.getAttribute(SyslogAttributes.VALID.key())); + Assert.assertEquals(String.valueOf(port), flowFile.getAttribute(SyslogAttributes.PORT.key())); + Assert.assertEquals(protocol, flowFile.getAttribute(SyslogAttributes.PROTOCOL.key())); + Assert.assertTrue(!StringUtils.isBlank(flowFile.getAttribute(SyslogAttributes.SENDER.key()))); } /** http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java index a1a4d04..9e84b37 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestParseSyslog.java @@ -17,7 +17,7 @@ package org.apache.nifi.processors.standard; -import org.apache.nifi.processors.standard.AbstractSyslogProcessor.SyslogAttributes; +import org.apache.nifi.processors.standard.syslog.SyslogAttributes; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/RELPFrameProducer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/RELPFrameProducer.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/RELPFrameProducer.java new file mode 100644 index 0000000..06ed76e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/RELPFrameProducer.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.processors.standard.relp.frame.RELPEncoder; +import org.apache.nifi.processors.standard.relp.frame.RELPFrame; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.charset.StandardCharsets; + +public class RELPFrameProducer { + + public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog"; + + static final RELPFrame OPEN_FRAME = new RELPFrame.Builder() + .txnr(1) + .command("open") + .dataLength(OPEN_FRAME_DATA.length()) + .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8)) + .build(); + + static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder() + .txnr(3) + .command("close") + .dataLength(0) + .data(new byte[0]) + .build(); + + public static void main(String[] args) { + if (args == null || args.length != 5) { + System.err.println("USAGE: RELPFrameProducer <HOST> <PORT> <NUM_MSGS> <DELAY_INTERVAL> <DELAY_MILLIS>"); + System.exit(1); + } + + final String host = args[0]; + final int port = Integer.parseInt(args[1]); + final int numMessages = Integer.parseInt(args[2]); + final int delayInterval = Integer.parseInt(args[3]); + final long delay = Long.parseLong(args[4]); + + final RELPEncoder encoder = new RELPEncoder(StandardCharsets.UTF_8); + + Socket socket = null; + try { + socket = new Socket(host, port); + + try (final OutputStream out = new BufferedOutputStream(socket.getOutputStream())) { + // send the open frame + out.write(encoder.encode(OPEN_FRAME)); + + // send the specified number of syslog messages + for (int i=2; i < (numMessages+2); i++) { + final byte[] data = ("this is message # " + i).getBytes(StandardCharsets.UTF_8); + + final RELPFrame syslogFrame = new RELPFrame.Builder() + .txnr(i) + .command("syslog") + .dataLength(data.length) + .data(data) + .build(); + + out.write(encoder.encode(syslogFrame)); + + if (i % delayInterval == 0) { + System.out.println("Sent " + i + " messages"); + out.flush(); + Thread.sleep(delay); + } + } + + // send the close frame + out.write(encoder.encode(CLOSE_FRAME)); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } catch (final IOException e) { + e.printStackTrace(); + } finally { + IOUtils.closeQuietly(socket); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1089f0a9/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/event/TestRELPEventFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/event/TestRELPEventFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/event/TestRELPEventFactory.java new file mode 100644 index 0000000..5d86c26 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/event/TestRELPEventFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.relp.event; + +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class TestRELPEventFactory { + + @Test + public void testCreateRELPEvent() { + final byte[] data = "this is an event".getBytes(StandardCharsets.UTF_8); + + final String sender = "sender1"; + final long txnr = 1; + final String command = "syslog"; + + final Map<String,String> metadata = new HashMap<>(); + metadata.put(EventFactory.SENDER_KEY, sender); + metadata.put(RELPMetadata.TXNR_KEY, String.valueOf(txnr)); + metadata.put(RELPMetadata.COMMAND_KEY, command); + + final ChannelResponder responder = new SocketChannelResponder(null); + + final EventFactory<RELPEvent> factory = new RELPEventFactory(); + + final RELPEvent event = factory.create(data, metadata, responder); + Assert.assertEquals(data, event.getData()); + Assert.assertEquals(sender, event.getSender()); + Assert.assertEquals(txnr, event.getTxnr()); + Assert.assertEquals(command, event.getCommand()); + Assert.assertEquals(responder, event.getResponder()); + } +}
