This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9a125dc NIFI-8616 Refactored PutTCP, PutUDP, and PutSplunk to use
Netty nifi-event-transport classes
9a125dc is described below
commit 9a125dcaafbaa922066f067d0c5e65f7286f8d44
Author: Nathan Gough <[email protected]>
AuthorDate: Tue Jun 22 15:45:38 2021 -0400
NIFI-8616 Refactored PutTCP, PutUDP, and PutSplunk to use Netty
nifi-event-transport classes
This closes #5182
Signed-off-by: David Handermann <[email protected]>
---
.../ByteArrayMessageNettyEventServerFactory.java | 1 -
.../netty/ByteArrayNettyEventSenderFactory.java | 46 +++
.../transport/netty/DelimitedInputStream.java | 43 +++
.../event/transport/netty/NettyEventSender.java | 15 +-
.../transport/netty/NettyEventSenderFactory.java | 27 +-
.../transport/netty/NettyEventServerFactory.java | 14 +-
.../netty/StreamingNettyEventSenderFactory.java | 51 +++
.../netty/codec/InputStreamMessageEncoder.java | 47 +++
.../transport/netty/NettyEventSenderTest.java | 2 +-
.../nifi-processor-utils/pom.xml | 6 +
.../util/put/AbstractPutEventProcessor.java | 214 +++----------
.../apache/nifi/processors/splunk/PutSplunk.java | 79 ++---
.../nifi/processors/splunk/TestPutSplunk.java | 341 +++++++--------------
.../apache/nifi/processors/standard/PutTCP.java | 114 ++-----
.../apache/nifi/processors/standard/PutUDP.java | 45 +--
.../nifi/processors/standard/TestListenSyslog.java | 1 -
.../nifi/processors/standard/TestPutTCP.java | 109 +++----
.../nifi/processors/standard/TestPutUDP.java | 94 +++---
.../processors/standard/util/TCPTestServer.java | 166 ----------
19 files changed, 556 insertions(+), 859 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
index 9647565..b6cd5bf 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayMessageNettyEventServerFactory.java
@@ -55,7 +55,6 @@ public class ByteArrayMessageNettyEventServerFactory extends
NettyEventServerFac
final int maxFrameLength,
final
BlockingQueue<ByteArrayMessage> messages) {
super(address, port, protocol);
-
final LogExceptionChannelHandler logExceptionChannelHandler = new
LogExceptionChannelHandler(log);
final ByteArrayMessageChannelHandler byteArrayMessageChannelHandler =
new ByteArrayMessageChannelHandler(messages);
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
new file mode 100644
index 0000000..71219b3
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/ByteArrayNettyEventSenderFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.handler.codec.bytes.ByteArrayEncoder;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import
org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
+import org.apache.nifi.logging.ComponentLog;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Netty Event Sender Factory for byte array messages
+ */
+public class ByteArrayNettyEventSenderFactory extends
NettyEventSenderFactory<byte[]> {
+ /**
+ * Netty Event Sender Factory using byte array
+ *
+ * @param log Component Log
+ * @param address Remote Address
+ * @param port Remote Port Number
+ * @param protocol Channel Protocol
+ */
+ public ByteArrayNettyEventSenderFactory(final ComponentLog log, final
String address, final int port, final TransportProtocol protocol) {
+ super(address, port, protocol);
+ final List<ChannelHandler> handlers = new ArrayList<>();
+ handlers.add(new LogExceptionChannelHandler(log));
+ handlers.add(new ByteArrayEncoder());
+ setHandlerSupplier(() -> handlers);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/DelimitedInputStream.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/DelimitedInputStream.java
new file mode 100644
index 0000000..4f6a4b1
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/DelimitedInputStream.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A delimited InputStream which retains a delimiter for later use
+ */
+public class DelimitedInputStream extends InputStream {
+
+ private final InputStream in;
+ private final byte[] delimiter;
+
+ public DelimitedInputStream(final InputStream in, final byte[] delimiter) {
+ this.in = in;
+ this.delimiter = delimiter;
+ }
+
+ public byte[] getDelimiter() {
+ return delimiter;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
index 7d9346c..4cc2b24 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSender.java
@@ -38,17 +38,21 @@ class NettyEventSender<T> implements EventSender<T> {
private final SocketAddress remoteAddress;
+ private boolean singleEventPerConnection;
+
/**
* Netty Channel Event Sender with Event Loop Group and Channel Pool
*
* @param group Event Loop Group
* @param channelPool Channel Pool
* @param remoteAddress Remote Address
+ * @param singleEventPerConnection If true, send a single event per
connection, and then close it.
*/
- NettyEventSender(final EventLoopGroup group, final ChannelPool
channelPool, final SocketAddress remoteAddress) {
+ NettyEventSender(final EventLoopGroup group, final ChannelPool
channelPool, final SocketAddress remoteAddress, final boolean
singleEventPerConnection) {
this.group = group;
this.channelPool = channelPool;
this.remoteAddress = remoteAddress;
+ this.singleEventPerConnection = singleEventPerConnection;
}
/**
@@ -65,7 +69,7 @@ class NettyEventSender<T> implements EventSender<T> {
final ChannelFuture channelFuture =
channel.writeAndFlush(event);
channelFuture.syncUninterruptibly();
} finally {
- channelPool.release(channel);
+ releaseChannel(channel);
}
} catch (final Exception e) {
throw new EventException(getChannelMessage("Send Failed"), e);
@@ -97,4 +101,11 @@ class NettyEventSender<T> implements EventSender<T> {
private String getChannelMessage(final String message) {
return String.format("%s Remote Address [%s]", message, remoteAddress);
}
+
+ private void releaseChannel(final Channel channel) {
+ if (singleEventPerConnection) {
+ channel.close();
+ }
+ channelPool.release(channel);
+ }
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
index 85024ff..3a83fbd 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventSenderFactory.java
@@ -49,6 +49,7 @@ import java.util.function.Supplier;
*/
public class NettyEventSenderFactory<T> extends EventLoopGroupFactory
implements EventSenderFactory<T> {
private static final int MAX_PENDING_ACQUIRES = 1024;
+ private Integer socketSendBufferSize = null;
private final String address;
@@ -64,6 +65,8 @@ public class NettyEventSenderFactory<T> extends
EventLoopGroupFactory implements
private SSLContext sslContext;
+ private boolean singleEventPerConnection = false;
+
public NettyEventSenderFactory(final String address, final int port, final
TransportProtocol protocol) {
this.address = address;
this.port = port;
@@ -71,6 +74,15 @@ public class NettyEventSenderFactory<T> extends
EventLoopGroupFactory implements
}
/**
+ * Set Socket Send Buffer Size for TCP Sockets
+ *
+ * @param socketSendBufferSize Send Buffer size can be null to use default
setting
+ */
+ public void setSocketSendBufferSize(final Integer socketSendBufferSize) {
+ this.socketSendBufferSize = socketSendBufferSize;
+ }
+
+ /**
* Set Channel Handler Supplier
*
* @param handlerSupplier Channel Handler Supplier
@@ -107,6 +119,16 @@ public class NettyEventSenderFactory<T> extends
EventLoopGroupFactory implements
}
/**
+ * Send a single event for the session and close the connection. Useful
for endpoints which can not be configured
+ * to listen for a delimiter.
+ *
+ * @param singleEventPerConnection true if the connection should be ended
after an event is sent
+ */
+ public void setSingleEventPerConnection(final boolean
singleEventPerConnection) {
+ this.singleEventPerConnection = singleEventPerConnection;
+ }
+
+ /*
* Get Event Sender with connected Channel
*
* @return Connected Event Sender
@@ -130,12 +152,15 @@ public class NettyEventSenderFactory<T> extends
EventLoopGroupFactory implements
private void setChannelOptions(final Bootstrap bootstrap) {
final int timeoutMilliseconds = (int) timeout.toMillis();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
timeoutMilliseconds);
+ if (socketSendBufferSize != null) {
+ bootstrap.option(ChannelOption.SO_SNDBUF, socketSendBufferSize);
+ }
}
private EventSender<T> getConfiguredEventSender(final Bootstrap bootstrap)
{
final SocketAddress remoteAddress = bootstrap.config().remoteAddress();
final ChannelPool channelPool = getChannelPool(bootstrap);
- return new NettyEventSender<>(bootstrap.config().group(), channelPool,
remoteAddress);
+ return new NettyEventSender<>(bootstrap.config().group(), channelPool,
remoteAddress, singleEventPerConnection);
}
private ChannelPool getChannelPool(final Bootstrap bootstrap) {
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
index 6d23161..161622e 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/NettyEventServerFactory.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.nifi.event.transport.EventException;
@@ -107,16 +108,25 @@ public class NettyEventServerFactory extends
EventLoopGroupFactory implements Ev
@Override
public EventServer getEventServer() {
final AbstractBootstrap<?, ?> bootstrap = getBootstrap();
+ setBufferSize(bootstrap);
final EventLoopGroup group = getEventLoopGroup();
bootstrap.group(group);
return getBoundEventServer(bootstrap, group);
}
+ private void setBufferSize(AbstractBootstrap<?, ?> bootstrap) {
+ if (socketReceiveBuffer != null) {
+ bootstrap.option(ChannelOption.SO_RCVBUF, socketReceiveBuffer);
+ bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new
FixedRecvByteBufAllocator(socketReceiveBuffer));
+ }
+ }
+
private AbstractBootstrap<?, ?> getBootstrap() {
if (TransportProtocol.UDP.equals(protocol)) {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioDatagramChannel.class);
bootstrap.handler(new
StandardChannelInitializer<>(handlerSupplier));
+
return bootstrap;
} else {
final ServerBootstrap bootstrap = new ServerBootstrap();
@@ -127,10 +137,6 @@ public class NettyEventServerFactory extends
EventLoopGroupFactory implements Ev
bootstrap.childHandler(new
ServerSslHandlerChannelInitializer<>(handlerSupplier, sslContext, clientAuth));
}
- if (socketReceiveBuffer != null) {
- bootstrap.option(ChannelOption.SO_RCVBUF, socketReceiveBuffer);
- }
-
return bootstrap;
}
}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
new file mode 100644
index 0000000..efb130b
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/StreamingNettyEventSenderFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty;
+
+import io.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import
org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
+import org.apache.nifi.event.transport.netty.codec.InputStreamMessageEncoder;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.InputStream;
+import java.util.Arrays;
+
+/**
+ * Netty Event Sender Factory for messages in an InputStream
+ */
+public class StreamingNettyEventSenderFactory extends
NettyEventSenderFactory<InputStream> {
+ /**
+ * Netty Event Sender Factory using InputStream. Uses a custom
InputStreamMessageEncoder and a ChunkedWriteHandler.
+ *
+ * @param log Component Log
+ * @param address Remote Address
+ * @param port Remote Port Number
+ * @param protocol Channel Protocol
+ */
+ public StreamingNettyEventSenderFactory(final ComponentLog log, final
String address, final int port, final TransportProtocol protocol) {
+ super(address, port, protocol);
+ final LogExceptionChannelHandler logExceptionChannelHandler = new
LogExceptionChannelHandler(log);
+ final InputStreamMessageEncoder inputStreamMessageEncoder = new
InputStreamMessageEncoder();
+
+ setHandlerSupplier(() -> Arrays.asList(
+ logExceptionChannelHandler,
+ new ChunkedWriteHandler(),
+ inputStreamMessageEncoder
+ ));
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/InputStreamMessageEncoder.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/InputStreamMessageEncoder.java
new file mode 100644
index 0000000..d7f4d84
--- /dev/null
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/netty/codec/InputStreamMessageEncoder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport.netty.codec;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.handler.stream.ChunkedStream;
+import org.apache.nifi.event.transport.netty.DelimitedInputStream;
+
+import java.io.InputStream;
+import java.util.List;
+
+/**
+ * Message encoder for an InputStream, which wraps the stream in a
ChunkedStream for use with a ChunkedWriter. Can add a delimiter
+ * to the end of the output objects if the InputStream is a
DelimitedInputStream.
+ */
[email protected]
+public class InputStreamMessageEncoder extends
MessageToMessageEncoder<InputStream> {
+
+ @Override
+ protected void encode(ChannelHandlerContext context, InputStream
messageStream, List<Object> out) throws Exception {
+ ChunkedStream chunkedMessage = new ChunkedStream(messageStream);
+ out.add(chunkedMessage);
+
+ // If the message being sent requires a delimiter added to the end of
the message, provide a DelimitedInputStream
+ if (messageStream instanceof DelimitedInputStream) {
+ DelimitedInputStream delimStream = (DelimitedInputStream)
messageStream;
+ out.add(Unpooled.wrappedBuffer(delimStream.getDelimiter()));
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
index 9341136..5fb613c 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/test/java/org/apache/nifi/event/transport/netty/NettyEventSenderTest.java
@@ -47,7 +47,7 @@ public class NettyEventSenderTest {
@Test
public void testClose() {
final SocketAddress socketAddress =
InetSocketAddress.createUnresolved(LOCALHOST,
NetworkUtils.getAvailableTcpPort());
- final NettyEventSender<?> sender = new NettyEventSender<>(group,
channelPool, socketAddress);
+ final NettyEventSender<?> sender = new NettyEventSender<>(group,
channelPool, socketAddress, false);
doReturn(shutdownFuture).when(group).shutdownGracefully();
sender.close();
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
index 7fed8b7..b0daa96 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/pom.xml
@@ -98,6 +98,12 @@
<artifactId>nifi-mock-record-utils</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-event-transport</artifactId>
+ <version>1.14.0-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<profiles>
<profile>
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index 8c3f58d..1b2798c 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -20,22 +20,25 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.event.transport.EventSender;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
+import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processor.util.put.sender.ChannelSender;
-import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
-import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
-import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -85,10 +88,8 @@ public abstract class AbstractPutEventProcessor extends
AbstractSessionFactoryPr
// Putting these properties here so sub-classes don't have to redefine
them, but they are
// not added to the properties by default since not all processors may
need them
-
public static final AllowableValue TCP_VALUE = new AllowableValue("TCP",
"TCP");
public static final AllowableValue UDP_VALUE = new AllowableValue("UDP",
"UDP");
-
public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
.Builder().name("Protocol")
.description("The protocol for communication.")
@@ -115,6 +116,7 @@ public abstract class AbstractPutEventProcessor extends
AbstractSessionFactoryPr
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor TIMEOUT = new
PropertyDescriptor.Builder()
.name("Timeout")
@@ -122,6 +124,7 @@ public abstract class AbstractPutEventProcessor extends
AbstractSessionFactoryPr
.required(false)
.defaultValue("10 seconds")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new
PropertyDescriptor.Builder()
.name("Outgoing Message Delimiter")
@@ -140,7 +143,6 @@ public abstract class AbstractPutEventProcessor extends
AbstractSessionFactoryPr
.defaultValue("false")
.allowableValues("true", "false")
.build();
-
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The Controller Service to use in order to obtain an
SSL Context. If this property is set, " +
@@ -158,14 +160,11 @@ public abstract class AbstractPutEventProcessor extends
AbstractSessionFactoryPr
.description("FlowFiles that failed to send to the destination are
sent out this relationship.")
.build();
- private static final long SENDER_POOL_POLL_TIMEOUT = 250;
- private static final TimeUnit SENDER_POOL_POLL_TIMEOUT_UNIT =
TimeUnit.MILLISECONDS;
-
private Set<Relationship> relationships;
private List<PropertyDescriptor> descriptors;
protected volatile String transitUri;
- private volatile BlockingQueue<ChannelSender> senderPool;
+ protected EventSender eventSender;
protected final BlockingQueue<FlowFileMessageBatch> completeBatches = new
LinkedBlockingQueue<>();
protected final Set<FlowFileMessageBatch> activeBatches =
Collections.synchronizedSet(new HashSet<>());
@@ -177,6 +176,7 @@ public abstract class AbstractPutEventProcessor extends
AbstractSessionFactoryPr
descriptors.add(PORT);
descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
descriptors.add(IDLE_EXPIRATION);
+ descriptors.add(TIMEOUT);
descriptors.addAll(getAdditionalProperties());
this.descriptors = Collections.unmodifiableList(descriptors);
@@ -218,18 +218,14 @@ public abstract class AbstractPutEventProcessor extends
AbstractSessionFactoryPr
@OnScheduled
public void onScheduled(final ProcessContext context) throws IOException {
// initialize the queue of senders, one per task, senders will get
created on the fly in onTrigger
- this.senderPool = new
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+ this.eventSender = getEventSender(context);
this.transitUri = createTransitUri(context);
}
@OnStopped
- public void closeSenders() {
- if (senderPool != null) {
- ChannelSender sender = pollSenderPool();
- while (sender != null) {
- sender.close();
- sender = pollSenderPool();
- }
+ public void closeSenders() throws Exception {
+ if (eventSender != null) {
+ eventSender.close();
}
}
@@ -243,145 +239,30 @@ public abstract class AbstractPutEventProcessor extends
AbstractSessionFactoryPr
*/
protected abstract String createTransitUri(final ProcessContext context);
- /**
- * Sub-classes create a ChannelSender given a context.
- *
- * @param context the current context
- * @return an implementation of ChannelSender
- * @throws IOException if an error occurs creating the ChannelSender
- */
- protected abstract ChannelSender createSender(final ProcessContext
context) throws IOException;
-
- /**
- * Close any senders that haven't been active with in the given threshold
- *
- * @param idleThreshold the threshold to consider a sender as idle
- * @return the number of connections that were closed as a result of being
idle
- */
- protected PruneResult pruneIdleSenders(final long idleThreshold) {
- int numClosed = 0;
- int numConsidered = 0;
-
- long currentTime = System.currentTimeMillis();
- final List<ChannelSender> putBack = new ArrayList<>();
-
- // if a connection hasn't been used with in the threshold then it gets
closed
- ChannelSender sender;
- while ((sender = pollSenderPool()) != null) {
- numConsidered++;
- if (currentTime > (sender.getLastUsed() + idleThreshold)) {
- getLogger().debug("Closing idle connection...");
- sender.close();
- numClosed++;
- } else {
- putBack.add(sender);
- }
- }
-
- // re-queue senders that weren't idle, but if the queue is full then
close the sender
- for (ChannelSender putBackSender : putBack) {
- boolean returned = senderPool.offer(putBackSender);
- if (!returned) {
- putBackSender.close();
- }
- }
-
- return new PruneResult(numClosed, numConsidered);
- }
-
- /**
- * Helper for sub-classes to create a sender.
- *
- * @param protocol the protocol for the sender
- * @param host the host to send to
- * @param port the port to send to
- * @param timeout the timeout for connecting and communicating over the
channel
- * @param maxSendBufferSize the maximum size of the socket send buffer
- * @param sslContext an SSLContext, or null if not using SSL
- *
- * @return a ChannelSender based on the given properties
- *
- * @throws IOException if an error occurs creating the sender
- */
- protected ChannelSender createSender(final String protocol,
- final String host,
- final int port,
- final int timeout,
- final int maxSendBufferSize,
- final SSLContext sslContext) throws
IOException {
-
- ChannelSender sender;
- if (protocol.equals(UDP_VALUE.getValue())) {
- sender = new DatagramChannelSender(host, port, maxSendBufferSize,
getLogger());
- } else {
- // if an SSLContextService is provided then we make a secure sender
- if (sslContext != null) {
- sender = new SSLSocketChannelSender(host, port,
maxSendBufferSize, sslContext, getLogger());
- } else {
- sender = new SocketChannelSender(host, port,
maxSendBufferSize, getLogger());
- }
- }
-
- sender.setTimeout(timeout);
- sender.open();
- return sender;
- }
-
- /**
- * Helper method to acquire an available ChannelSender from the pool. If
the pool is empty then the a new sender is created.
- *
- * @param context
- * - the current process context.
- *
- * @param session
- * - the current process session.
- * @param flowFile
- * - the FlowFile being processed in this session.
- *
- * @return ChannelSender - the sender that has been acquired or null if no
sender is available and a new sender cannot be created.
- */
- protected ChannelSender acquireSender(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile) {
- ChannelSender sender = pollSenderPool();
- if (sender == null) {
- try {
- getLogger().debug("No available connections, creating a new
one...");
- sender = createSender(context);
- } catch (IOException e) {
- getLogger().error("No available connections, and unable to
create a new one, transferring {} to failure",
- new Object[]{flowFile}, e);
- session.transfer(flowFile, REL_FAILURE);
- session.commitAsync();
- context.yield();
- sender = null;
- }
+ protected EventSender<?> getEventSender(final ProcessContext context) {
+ final String hostname =
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
+ final int port =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
+ final String protocol = getProtocol(context);
+ final boolean singleEventPerConnection =
context.getProperty(CONNECTION_PER_FLOWFILE).getValue() != null ?
context.getProperty(CONNECTION_PER_FLOWFILE).asBoolean() : false;
+
+ final NettyEventSenderFactory factory =
getNettyEventSenderFactory(hostname, port, protocol);
+ factory.setThreadNamePrefix(String.format("%s[%s]",
getClass().getSimpleName(), getIdentifier()));
+ factory.setWorkerThreads(context.getMaxConcurrentTasks());
+ factory.setMaxConnections(context.getMaxConcurrentTasks());
+
factory.setSocketSendBufferSize(context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue());
+ factory.setSingleEventPerConnection(singleEventPerConnection);
+
+ final int timeout =
context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ factory.setTimeout(Duration.ofMillis(timeout));
+
+ final PropertyValue sslContextServiceProperty =
context.getProperty(SSL_CONTEXT_SERVICE);
+ if (sslContextServiceProperty.isSet()) {
+ final SSLContextService sslContextService =
sslContextServiceProperty.asControllerService(SSLContextService.class);
+ final SSLContext sslContext = sslContextService.createContext();
+ factory.setSslContext(sslContext);
}
- return sender;
- }
-
-
- /**
- * Helper method to relinquish the ChannelSender back to the pool. If the
sender is disconnected or the pool is full
- * then the sender is closed and discarded.
- *
- * @param sender the sender to return or close
- */
- protected void relinquishSender(final ChannelSender sender) {
- if (sender != null) {
- // if the connection is still open then then try to return the
sender to the pool.
- if (sender.isConnected()) {
- boolean returned = senderPool.offer(sender);
- // if the pool is full then close the sender.
- if (!returned) {
- getLogger().debug("Sender wasn't returned because queue
was full, closing sender");
- sender.close();
- }
- } else {
- // probably already closed here, but quietly close anyway to
be safe.
- getLogger().debug("Sender is not connected, closing sender");
- sender.close();
- }
- }
+ return factory.getEventSender();
}
/**
@@ -612,22 +493,11 @@ public abstract class AbstractPutEventProcessor extends
AbstractSessionFactoryPr
return delimiter;
}
- /**
- * Poll Sender Pool when not empty using a timeout to avoid blocking
indefinitely
- *
- * @return Channel Sender or null when not found
- */
- private ChannelSender pollSenderPool() {
- ChannelSender channelSender = null;
-
- if (!senderPool.isEmpty()) {
- try {
- channelSender = senderPool.poll(SENDER_POOL_POLL_TIMEOUT,
SENDER_POOL_POLL_TIMEOUT_UNIT);
- } catch (final InterruptedException e) {
- getLogger().warn("Interrupted while polling for
ChannelSender", e);
- }
- }
+ protected String getProtocol(final ProcessContext context) {
+ return context.getProperty(PROTOCOL).getValue();
+ }
- return channelSender;
+ protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final
String hostname, final int port, final String protocol) {
+ return new ByteArrayNettyEventSenderFactory(getLogger(), hostname,
port, TransportProtocol.valueOf(protocol));
}
}
diff --git
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
index 275e5fa..0a354d6 100644
---
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
+++
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/PutSplunk.java
@@ -25,9 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -36,15 +34,14 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
-import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
@@ -109,23 +106,6 @@ public class PutSplunk extends AbstractPutEventProcessor {
}
@Override
- protected ChannelSender createSender(ProcessContext context) throws
IOException {
- final int port =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
- final String host =
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
- final String protocol = context.getProperty(PROTOCOL).getValue();
- final int timeout =
context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
- final int maxSendBuffer =
context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
- final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-
- SSLContext sslContext = null;
- if (sslContextService != null) {
- sslContext = sslContextService.createContext();
- }
-
- return createSender(protocol, host, port, timeout, maxSendBuffer,
sslContext);
- }
-
- @Override
public void onTrigger(ProcessContext context, ProcessSessionFactory
sessionFactory) throws ProcessException {
// first complete any batches from previous executions
FlowFileMessageBatch batch;
@@ -136,45 +116,34 @@ public class PutSplunk extends AbstractPutEventProcessor {
// create a session and try to get a FlowFile, if none available then
close any idle senders
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
+
if (flowFile == null) {
- final PruneResult result =
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
- // yield if we closed an idle connection, or if there were no
connections in the first place
- if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 &&
result.getNumConsidered() == 0)) {
- context.yield();
- }
return;
}
- // get a sender from the pool, or create a new one if the pool is empty
- // if we can't create a new connection then route flow files to
failure and yield
- // acquireSender will handle the routing to failure and yielding
- ChannelSender sender = acquireSender(context, session, flowFile);
- if (sender == null) {
- return;
+ String delimiter =
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
+ if (delimiter != null) {
+ delimiter = delimiter.replace("\\n", "\n").replace("\\r",
"\r").replace("\\t", "\t");
}
+ // if no delimiter then treat the whole FlowFile as a single message
try {
- String delimiter =
context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
- if (delimiter != null) {
- delimiter = delimiter.replace("\\n", "\n").replace("\\r",
"\r").replace("\\t", "\t");
- }
-
- // if no delimiter then treat the whole FlowFile as a single
message
if (delimiter == null) {
- processSingleMessage(context, session, flowFile, sender);
+ processSingleMessage(context, session, flowFile);
} else {
- processDelimitedMessages(context, session, flowFile, sender,
delimiter);
+ processDelimitedMessages(context, session, flowFile,
delimiter);
}
-
- } finally {
- relinquishSender(sender);
+ } catch (EventException e) {
+ session.transfer(flowFile, REL_FAILURE);
+ session.commitAsync();
+ context.yield();
}
}
/**
* Send the entire FlowFile as a single message.
*/
- private void processSingleMessage(ProcessContext context, ProcessSession
session, FlowFile flowFile, ChannelSender sender) {
+ private void processSingleMessage(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile) {
// copy the contents of the FlowFile to the ByteArrayOutputStream
final ByteArrayOutputStream baos = new
ByteArrayOutputStream((int)flowFile.getSize() + 1);
session.read(flowFile, new InputStreamCallback() {
@@ -200,20 +169,15 @@ public class PutSplunk extends AbstractPutEventProcessor {
activeBatches.add(messageBatch);
// attempt to send the data and add the appropriate range
- try {
- sender.send(buf);
- messageBatch.addSuccessfulRange(0L, flowFile.getSize());
- } catch (IOException e) {
- messageBatch.addFailedRange(0L, flowFile.getSize(), e);
- context.yield();
- }
+ eventSender.sendEvent(buf);
+ messageBatch.addSuccessfulRange(0L, flowFile.getSize());
}
/**
* Read delimited messages from the FlowFile tracking which messages are
sent successfully.
*/
private void processDelimitedMessages(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile,
- final ChannelSender sender, final
String delimiter) {
+ final String delimiter) {
final String protocol = context.getProperty(PROTOCOL).getValue();
final byte[] delimiterBytes =
delimiter.getBytes(StandardCharsets.UTF_8);
@@ -264,13 +228,9 @@ public class PutSplunk extends AbstractPutEventProcessor {
// If the message has no data, ignore it.
if (data.length != 0) {
final long rangeStart = messageStartOffset;
- try {
- sender.send(data);
-
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset);
- messagesSent.incrementAndGet();
- } catch (final IOException e) {
-
messageBatch.addFailedRange(rangeStart, messageEndOffset, e);
- }
+ eventSender.sendEvent(data);
+
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset);
+ messagesSent.incrementAndGet();
}
// reset BAOS so that we can start a new
message.
@@ -320,5 +280,4 @@ public class PutSplunk extends AbstractPutEventProcessor {
return Arrays.copyOfRange(baos.toByteArray(), 0, length);
}
}
-
}
diff --git
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
index a0713a3..272faa5 100644
---
a/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
+++
b/nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestPutSplunk.java
@@ -16,44 +16,64 @@
*/
package org.apache.nifi.processors.splunk;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.put.sender.ChannelSender;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import javax.net.ssl.SSLContext;
-import java.io.IOException;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
public class TestPutSplunk {
private TestRunner runner;
- private TestablePutSplunk proc;
- private CapturingChannelSender sender;
+ private BlockingQueue<ByteArrayMessage> messages;
+ private EventServer eventServer;
+ private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
+ private final static String OUTGOING_MESSAGE_DELIMITER = "\n";
+ private static final Charset CHARSET = StandardCharsets.UTF_8;
+ private final static int VALID_LARGE_FILE_SIZE = 32768;
+ private static final String LOCALHOST = "localhost";
@Before
- public void init() {
- ComponentLog logger = Mockito.mock(ComponentLog.class);
- sender = new CapturingChannelSender("localhost", 12345, 0, logger);
- proc = new TestablePutSplunk(sender);
+ public void setup() throws Exception {
+ runner = TestRunners.newTestRunner(PutSplunk.class);
+ }
- runner = TestRunners.newTestRunner(proc);
- runner.setProperty(PutSplunk.PORT, "12345");
+ @After
+ public void cleanup() {
+ runner.shutdown();
+ shutdownServer();
+ }
+
+ private void shutdownServer() {
+ if (eventServer != null) {
+ eventServer.shutdown();
+ }
}
- @Test
- public void testUDPSendWholeFlowFile() {
- runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testUDPSendWholeFlowFile() throws Exception {
+ createTestServer(TransportProtocol.UDP);
+ runner.setProperty(PutSplunk.MESSAGE_DELIMITER,
OUTGOING_MESSAGE_DELIMITER);
final String message = "This is one message, should send the whole
FlowFile";
runner.enqueue(message);
@@ -63,16 +83,13 @@ public class TestPutSplunk {
final MockFlowFile mockFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
- Assert.assertEquals(1, sender.getMessages().size());
- Assert.assertEquals(message, sender.getMessages().get(0));
+ checkReceivedAllData(message);
}
- @Test
- public void testTCPSendWholeFlowFile() {
- runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
-
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testTCPSendWholeFlowFile() throws Exception {
+ createTestServer(TransportProtocol.TCP);
final String message = "This is one message, should send the whole
FlowFile";
-
runner.enqueue(message);
runner.run(1);
runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 1);
@@ -80,13 +97,29 @@ public class TestPutSplunk {
final MockFlowFile mockFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
- Assert.assertEquals(1, sender.getMessages().size());
- Assert.assertEquals(message + "\n", sender.getMessages().get(0));
+ checkReceivedAllData(message);
+ }
+
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testTCPSendMultipleFlowFiles() throws Exception {
+ createTestServer(TransportProtocol.TCP);
+
+ final String message = "This is one message, should send the whole
FlowFile";
+
+ runner.enqueue(message);
+ runner.enqueue(message);
+ runner.run(2);
+ runner.assertAllFlowFilesTransferred(PutSplunk.REL_SUCCESS, 2);
+
+ final MockFlowFile mockFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
+ mockFlowFile.assertContentEquals(message);
+
+ checkReceivedAllData(message, message);
}
- @Test
- public void testTCPSendWholeFlowFileAlreadyHasNewLine() {
- runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testTCPSendWholeFlowFileAlreadyHasNewLine() throws Exception {
+ createTestServer(TransportProtocol.TCP);
final String message = "This is one message, should send the whole
FlowFile\n";
@@ -97,14 +130,12 @@ public class TestPutSplunk {
final MockFlowFile mockFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
- Assert.assertEquals(1, sender.getMessages().size());
- Assert.assertEquals(message, sender.getMessages().get(0));
+ checkReceivedAllData(message.trim());
}
- @Test
- public void testUDPSendDelimitedMessages() {
- runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
-
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testUDPSendDelimitedMessages() throws Exception {
+ createTestServer(TransportProtocol.UDP);
final String delimiter = "DD";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
@@ -117,17 +148,14 @@ public class TestPutSplunk {
final MockFlowFile mockFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
- Assert.assertEquals(3, sender.getMessages().size());
- Assert.assertEquals("This is message 1", sender.getMessages().get(0));
- Assert.assertEquals("This is message 2", sender.getMessages().get(1));
- Assert.assertEquals("This is message 3", sender.getMessages().get(2));
+ checkReceivedAllData("This is message 1", "This is message 2", "This
is message 3");
}
- @Test
- public void testTCPSendDelimitedMessages() {
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testTCPSendDelimitedMessages() throws Exception {
+ createTestServer(TransportProtocol.TCP);
final String delimiter = "DD";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
- runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
// no delimiter at end
final String message = "This is message 1DDThis is message 2DDThis is
message 3";
@@ -139,17 +167,15 @@ public class TestPutSplunk {
final MockFlowFile mockFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
- Assert.assertEquals(3, sender.getMessages().size());
- Assert.assertEquals("This is message 1\n",
sender.getMessages().get(0));
- Assert.assertEquals("This is message 2\n",
sender.getMessages().get(1));
- Assert.assertEquals("This is message 3\n",
sender.getMessages().get(2));
+ checkReceivedAllData("This is message 1", "This is message 2", "This
is message 3");
}
- @Test
- public void testTCPSendDelimitedMessagesWithEL() {
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testTCPSendDelimitedMessagesWithEL() throws Exception {
+ createTestServer(TransportProtocol.TCP);
+
final String delimiter = "DD";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, "${flow.file.delim}");
- runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
// no delimiter at end
final String message = "This is message 1DDThis is message 2DDThis is
message 3";
@@ -164,17 +190,14 @@ public class TestPutSplunk {
final MockFlowFile mockFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
- Assert.assertEquals(3, sender.getMessages().size());
- Assert.assertEquals("This is message 1\n",
sender.getMessages().get(0));
- Assert.assertEquals("This is message 2\n",
sender.getMessages().get(1));
- Assert.assertEquals("This is message 3\n",
sender.getMessages().get(2));
+ checkReceivedAllData("This is message 1", "This is message 2", "This
is message 3");
}
- @Test
- public void testTCPSendDelimitedMessagesEndsWithDelimiter() {
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testTCPSendDelimitedMessagesEndsWithDelimiter() throws
Exception {
+ createTestServer(TransportProtocol.TCP);
final String delimiter = "DD";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
- runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
// delimiter at end
final String message = "This is message 1DDThis is message 2DDThis is
message 3DD";
@@ -186,17 +209,16 @@ public class TestPutSplunk {
final MockFlowFile mockFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
- Assert.assertEquals(3, sender.getMessages().size());
- Assert.assertEquals("This is message 1\n",
sender.getMessages().get(0));
- Assert.assertEquals("This is message 2\n",
sender.getMessages().get(1));
- Assert.assertEquals("This is message 3\n",
sender.getMessages().get(2));
+ checkReceivedAllData("This is message 1", "This is message 2", "This
is message 3");
+
}
- @Test
- public void testTCPSendDelimitedMessagesWithNewLineDelimiter() {
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testTCPSendDelimitedMessagesWithNewLineDelimiter() throws
Exception {
+ createTestServer(TransportProtocol.TCP);
final String delimiter = "\\n";
runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
- runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
+ runner.setProperty(PutSplunk.CHARSET, "UTF-8");
final String message = "This is message 1\nThis is message 2\nThis is
message 3";
@@ -207,89 +229,12 @@ public class TestPutSplunk {
final MockFlowFile mockFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
- Assert.assertEquals(3, sender.getMessages().size());
- Assert.assertEquals("This is message 1\n",
sender.getMessages().get(0));
- Assert.assertEquals("This is message 2\n",
sender.getMessages().get(1));
- Assert.assertEquals("This is message 3\n",
sender.getMessages().get(2));
- }
-
- @Test
- public void testTCPSendDelimitedMessagesWithErrors() {
- sender.setErrorStart(3);
- sender.setErrorEnd(4);
-
- final String delimiter = "DD";
- runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
- runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
-
- // no delimiter at end
- final String success = "This is message 1DDThis is message 2DD";
- final String failure = "This is message 3DDThis is message 4";
- final String message = success + failure;
-
- runner.enqueue(message);
- runner.run(1);
- runner.assertTransferCount(PutSplunk.REL_SUCCESS, 1);
- runner.assertTransferCount(PutSplunk.REL_FAILURE, 1);
-
- // first two messages should went out success
- final MockFlowFile successFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
- successFlowFile.assertContentEquals(success);
-
- // second two messages should went to failure
- final MockFlowFile failureFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_FAILURE).get(0);
- failureFlowFile.assertContentEquals(failure);
-
- // should only have the first two messages
- Assert.assertEquals(2, sender.getMessages().size());
- Assert.assertEquals("This is message 1\n",
sender.getMessages().get(0));
- Assert.assertEquals("This is message 2\n",
sender.getMessages().get(1));
+ checkReceivedAllData("This is message 1", "This is message 2", "This
is message 3");
}
- @Test
- public void testTCPSendDelimitedMessagesWithErrorsInMiddle() {
- sender.setErrorStart(3);
- sender.setErrorEnd(4);
-
- final String delimiter = "DD";
- runner.setProperty(PutSplunk.MESSAGE_DELIMITER, delimiter);
- runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.TCP_VALUE.getValue());
-
- // no delimiter at end
- final String success = "This is message 1DDThis is message 2DD";
- final String failure = "This is message 3DDThis is message 4DD";
- final String success2 = "This is message 5DDThis is message 6DDThis is
message 7DD";
- final String message = success + failure + success2;
-
- runner.enqueue(message);
- runner.run(1);
- runner.assertTransferCount(PutSplunk.REL_SUCCESS, 2);
- runner.assertTransferCount(PutSplunk.REL_FAILURE, 1);
-
- // first two messages should have went out success
- final MockFlowFile successFlowFile1 =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
- successFlowFile1.assertContentEquals(success);
-
- // last three messages should have went out success
- final MockFlowFile successFlowFile2 =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(1);
- successFlowFile2.assertContentEquals(success2);
-
- // second two messages should have went to failure
- final MockFlowFile failureFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_FAILURE).get(0);
- failureFlowFile.assertContentEquals(failure);
-
- // should only have the first two messages
- Assert.assertEquals(5, sender.getMessages().size());
- Assert.assertEquals("This is message 1\n",
sender.getMessages().get(0));
- Assert.assertEquals("This is message 2\n",
sender.getMessages().get(1));
- Assert.assertEquals("This is message 5\n",
sender.getMessages().get(2));
- Assert.assertEquals("This is message 6\n",
sender.getMessages().get(3));
- Assert.assertEquals("This is message 7\n",
sender.getMessages().get(4));
- }
-
- @Test
- public void testCompletingPreviousBatchOnNextExecution() {
- runner.setProperty(PutSplunk.PROTOCOL, PutSplunk.UDP_VALUE.getValue());
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testCompletingPreviousBatchOnNextExecution() throws Exception {
+ createTestServer(TransportProtocol.UDP);
final String message = "This is one message, should send the whole
FlowFile";
@@ -300,15 +245,13 @@ public class TestPutSplunk {
final MockFlowFile mockFlowFile =
runner.getFlowFilesForRelationship(PutSplunk.REL_SUCCESS).get(0);
mockFlowFile.assertContentEquals(message);
- Assert.assertEquals(1, sender.getMessages().size());
- Assert.assertEquals(message, sender.getMessages().get(0));
+ checkReceivedAllData(message);
}
- @Test
- public void testUnableToCreateConnectionShouldRouteToFailure() {
- PutSplunk proc = new UnableToConnectPutSplunk();
- runner = TestRunners.newTestRunner(proc);
- runner.setProperty(PutSplunk.PORT, "12345");
+ @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ public void testUnableToCreateConnectionShouldRouteToFailure() throws
InterruptedException {
+ // Set an unreachable port
+ runner.setProperty(PutSplunk.PORT,
String.valueOf(NetworkUtils.getAvailableUdpPort()));
final String message = "This is one message, should send the whole
FlowFile";
@@ -317,84 +260,38 @@ public class TestPutSplunk {
runner.assertAllFlowFilesTransferred(PutSplunk.REL_FAILURE, 1);
}
- /**
- * Extend PutSplunk to use a CapturingChannelSender.
- */
- private static class UnableToConnectPutSplunk extends PutSplunk {
-
- @Override
- protected ChannelSender createSender(String protocol, String host, int
port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws
IOException {
- throw new IOException("Unable to create connection");
- }
+ private void createTestServer(final TransportProtocol protocol) {
+ createTestServer(LOCALHOST, protocol, null);
}
- /**
- * Extend PutSplunk to use a CapturingChannelSender.
- */
- private static class TestablePutSplunk extends PutSplunk {
-
- private ChannelSender sender;
-
- public TestablePutSplunk(ChannelSender channelSender) {
- this.sender = channelSender;
- }
-
- @Override
- protected ChannelSender createSender(String protocol, String host, int
port, int timeout, int maxSendBufferSize, SSLContext sslContext) throws
IOException {
- return sender;
+ private void createTestServer(final String address, final
TransportProtocol protocol, final SSLContext sslContext) {
+ if (protocol == TransportProtocol.UDP) {
+ createTestServer(address, NetworkUtils.getAvailableUdpPort(),
protocol, sslContext);
+ } else {
+ createTestServer(address, NetworkUtils.getAvailableTcpPort(),
protocol, sslContext);
}
}
-
- /**
- * A ChannelSender that captures each message that was sent.
- */
- private static class CapturingChannelSender extends ChannelSender {
-
- private List<String> messages = new ArrayList<>();
- private int count = 0;
- private int errorStart = -1;
- private int errorEnd = -1;
-
- public CapturingChannelSender(String host, int port, int
maxSendBufferSize, ComponentLog logger) {
- super(host, port, maxSendBufferSize, logger);
- }
-
- @Override
- public void open() throws IOException {
-
- }
-
- @Override
- protected void write(byte[] data) throws IOException {
- count++;
- if (errorStart > 0 && count >= errorStart && errorEnd > 0 && count
<= errorEnd) {
- throw new IOException("this is an error");
- }
- messages.add(new String(data, StandardCharsets.UTF_8));
- }
-
- @Override
- public boolean isConnected() {
- return false;
- }
-
- @Override
- public void close() {
-
- }
-
- public List<String> getMessages() {
- return messages;
+ private void createTestServer(final String address, final int port, final
TransportProtocol protocol, final SSLContext sslContext) {
+ messages = new LinkedBlockingQueue<>();
+ runner.setProperty(PutSplunk.PROTOCOL, protocol.name());
+ runner.setProperty(PutSplunk.PORT, String.valueOf(port));
+ final byte[] delimiter = OUTGOING_MESSAGE_DELIMITER.getBytes(CHARSET);
+ NettyEventServerFactory serverFactory = new
ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port,
protocol, delimiter, VALID_LARGE_FILE_SIZE, messages);
+ if (sslContext != null) {
+ serverFactory.setSslContext(sslContext);
}
+ eventServer = serverFactory.getEventServer();
+ }
- public void setErrorStart(int errorStart) {
- this.errorStart = errorStart;
+ private void checkReceivedAllData(final String... sentData) throws
Exception {
+ // check each sent FlowFile was successfully sent and received.
+ for (String item : sentData) {
+ ByteArrayMessage packet = messages.take();
+ assertNotNull(packet);
+ assertArrayEquals(item.getBytes(), packet.getMessage());
}
- public void setErrorEnd(int errorEnd) {
- this.errorEnd = errorEnd;
- }
+ assertNull("Unexpected extra messages found", messages.poll());
}
-
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
index 2c0f17c..da6e479 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.standard;
-import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
@@ -24,23 +23,20 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.DelimitedInputStream;
+import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
+import org.apache.nifi.event.transport.netty.StreamingNettyEventSenderFactory;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
-import org.apache.nifi.processor.util.put.sender.ChannelSender;
-import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
-import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StopWatch;
-
-import javax.net.ssl.SSLContext;
-import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
@@ -97,31 +93,6 @@ import java.util.concurrent.TimeUnit;
public class PutTCP extends AbstractPutEventProcessor {
/**
- * Creates a concrete instance of a ChannelSender object to use for
sending messages over a TCP stream.
- *
- * @param context
- * - the current process context.
- *
- * @return ChannelSender object.
- */
- @Override
- protected ChannelSender createSender(final ProcessContext context) throws
IOException {
- final String protocol = TCP_VALUE.getValue();
- final String hostname =
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
- final int port =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
- final int timeout =
context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
- final int bufferSize =
context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
- final SSLContextService sslContextService = (SSLContextService)
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService();
-
- SSLContext sslContext = null;
- if (sslContextService != null) {
- sslContext = sslContextService.createContext();
- }
-
- return createSender(protocol, hostname, port, timeout, bufferSize,
sslContext);
- }
-
- /**
* Creates a Universal Resource Identifier (URI) for this processor.
Constructs a URI of the form TCP://< host >:< port > where the host and port
* values are taken from the configured property values.
*
@@ -168,46 +139,25 @@ public class PutTCP extends AbstractPutEventProcessor {
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
if (flowFile == null) {
- final PruneResult result =
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
- // yield if we closed an idle connection, or if there were no
connections in the first place
- if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 &&
result.getNumConsidered() == 0)) {
- context.yield();
- }
return;
}
- ChannelSender sender = acquireSender(context, session, flowFile);
- if (sender == null) {
- return;
- }
-
- // really shouldn't happen since we know the protocol is TCP here, but
this is more graceful so we
- // can cast to a SocketChannelSender later in order to obtain the
OutputStream
- if (!(sender instanceof SocketChannelSender)) {
- getLogger().error("Processor can only be used with a
SocketChannelSender, but obtained: " + sender.getClass().getCanonicalName());
- context.yield();
- return;
- }
-
- boolean closeSender = isConnectionPerFlowFile(context);
try {
- // We might keep the connection open across invocations of the
processor so don't auto-close this
- final OutputStream out =
((SocketChannelSender)sender).getOutputStream();
- final String delimiter = getOutgoingMessageDelimiter(context,
flowFile);
-
- final StopWatch stopWatch = new StopWatch(true);
- try (final InputStream rawIn = session.read(flowFile);
- final BufferedInputStream in = new
BufferedInputStream(rawIn)) {
- IOUtils.copy(in, out);
- if (delimiter != null) {
- final Charset charSet =
Charset.forName(context.getProperty(CHARSET).getValue());
- out.write(delimiter.getBytes(charSet), 0,
delimiter.length());
+ StopWatch stopWatch = new StopWatch(true);
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ InputStream event = in;
+
+ String delimiter = getOutgoingMessageDelimiter(context,
flowFile);
+ if (delimiter != null) {
+ final Charset charSet =
Charset.forName(context.getProperty(CHARSET).getValue());
+ event = new DelimitedInputStream(in,
delimiter.getBytes(charSet));
+ }
+
+ eventSender.sendEvent(event);
}
- out.flush();
- } catch (final Exception e) {
- closeSender = true;
- throw e;
- }
+ });
session.getProvenanceReporter().send(flowFile, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
@@ -215,14 +165,6 @@ public class PutTCP extends AbstractPutEventProcessor {
} catch (Exception e) {
onFailure(context, session, flowFile);
getLogger().error("Exception while handling a process session,
transferring {} to failure.", new Object[] { flowFile }, e);
- } finally {
- if (closeSender) {
- getLogger().debug("Closing sender");
- sender.close();
- } else {
- getLogger().debug("Relinquishing sender");
- relinquishSender(sender);
- }
}
}
@@ -243,15 +185,13 @@ public class PutTCP extends AbstractPutEventProcessor {
context.yield();
}
- /**
- * Gets the current value of the "Connection Per FlowFile" property.
- *
- * @param context
- * - the current process context.
- *
- * @return boolean value - true if a connection per FlowFile is specified.
- */
- protected boolean isConnectionPerFlowFile(final ProcessContext context) {
- return
context.getProperty(CONNECTION_PER_FLOWFILE).getValue().equalsIgnoreCase("true");
+ @Override
+ protected String getProtocol(final ProcessContext context) {
+ return TCP_VALUE.getValue();
+ }
+
+ @Override
+ protected NettyEventSenderFactory<?> getNettyEventSenderFactory(final
String hostname, final int port, final String protocol) {
+ return new StreamingNettyEventSenderFactory(getLogger(), hostname,
port, TransportProtocol.valueOf(protocol));
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
index 6bcff04..a6ce71c 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutUDP.java
@@ -23,14 +23,12 @@ import
org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
-import org.apache.nifi.processor.util.put.sender.ChannelSender;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
@@ -82,24 +80,6 @@ import java.util.concurrent.TimeUnit;
public class PutUDP extends AbstractPutEventProcessor {
/**
- * Creates a concrete instance of a ChannelSender object to use for
sending UDP datagrams.
- *
- * @param context
- * - the current process context.
- *
- * @return ChannelSender object.
- */
- @Override
- protected ChannelSender createSender(final ProcessContext context) throws
IOException {
- final String protocol = UDP_VALUE.getValue();
- final String hostname =
context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
- final int port =
context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
- final int bufferSize =
context.getProperty(MAX_SOCKET_SEND_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-
- return createSender(protocol, hostname, port, 0, bufferSize, null);
- }
-
- /**
* Creates a Universal Resource Identifier (URI) for this processor.
Constructs a URI of the form UDP://host:port where the host and port values are
taken from the configured property values.
*
* @param context
@@ -131,31 +111,21 @@ public class PutUDP extends AbstractPutEventProcessor {
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
if (flowFile == null) {
- final PruneResult result =
pruneIdleSenders(context.getProperty(IDLE_EXPIRATION).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
- // yield if we closed an idle connection, or if there were no
connections in the first place
- if (result.getNumClosed() > 0 || (result.getNumClosed() == 0 &&
result.getNumConsidered() == 0)) {
- context.yield();
- }
- return;
- }
-
- ChannelSender sender = acquireSender(context, session, flowFile);
- if (sender == null) {
return;
}
try {
- byte[] content = readContent(session, flowFile);
+ final byte[] content = readContent(session, flowFile);
StopWatch stopWatch = new StopWatch(true);
- sender.send(content);
+ if (content != null) {
+ eventSender.sendEvent(content);
+ }
session.getProvenanceReporter().send(flowFile, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowFile, REL_SUCCESS);
session.commitAsync();
} catch (Exception e) {
getLogger().error("Exception while handling a process session,
transferring {} to failure.", new Object[] { flowFile }, e);
onFailure(context, session, flowFile);
- } finally {
- relinquishSender(sender);
}
}
@@ -176,8 +146,6 @@ public class PutUDP extends AbstractPutEventProcessor {
context.yield();
}
-
-
/**
* Helper method to read the FlowFile content stream into a byte array.
*
@@ -199,4 +167,9 @@ public class PutUDP extends AbstractPutEventProcessor {
return baos.toByteArray();
}
+
+ @Override
+ protected String getProtocol(final ProcessContext context) {
+ return UDP_VALUE.getValue();
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
index 815a639..55d083f 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenSyslog.java
@@ -164,7 +164,6 @@ public class TestListenSyslog {
runner.run(1, STOP_ON_FINISH_DISABLED);
sendMessages(protocol, port, LineEnding.UNIX, VALID_MESSAGE);
-
runner.run(1, STOP_ON_FINISH_ENABLED, INITIALIZE_DISABLED);
final List<MockFlowFile> invalidFlowFiles =
runner.getFlowFilesForRelationship(ListenSyslog.REL_INVALID);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
index e608f13..2a9e60b 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
@@ -17,8 +17,12 @@
package org.apache.nifi.processors.standard;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.nifi.processors.standard.util.TCPTestServer;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;
@@ -32,15 +36,12 @@ import org.junit.Test;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
-import javax.net.ServerSocketFactory;
import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocketFactory;
-import java.net.InetAddress;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.Arrays;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -52,7 +53,6 @@ public class TestPutTCP {
private final static int MIN_VALID_PORT = 1;
private final static int MAX_VALID_PORT = 65535;
private final static int MAX_INVALID_PORT = 65536;
- private final static int BUFFER_SIZE = 1024;
private final static int VALID_LARGE_FILE_SIZE = 32768;
private final static int VALID_SMALL_FILE_SIZE = 64;
private final static int LOAD_TEST_ITERATIONS = 500;
@@ -68,16 +68,17 @@ public class TestPutTCP {
@Rule
public Timeout timeout = new Timeout(30, TimeUnit.SECONDS);
- private TCPTestServer server;
+ private EventServer eventServer;
private int port;
- private ArrayBlockingQueue<List<Byte>> received;
+ private TransportProtocol PROTOCOL = TransportProtocol.TCP;
private TestRunner runner;
+ private BlockingQueue<ByteArrayMessage> messages;
@Before
public void setup() throws Exception {
- received = new ArrayBlockingQueue<>(BUFFER_SIZE);
runner = TestRunners.newTestRunner(PutTCP.class);
runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
+ port = NetworkUtils.getAvailableTcpPort();
}
@After
@@ -103,11 +104,10 @@ public class TestPutTCP {
@Test
public void testRunSuccess() throws Exception {
- createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER,
false);
+ createTestServer(TCP_SERVER_ADDRESS, port);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
- assertServerConnections(1);
}
@Test
@@ -116,7 +116,6 @@ public class TestPutTCP {
final SSLContext sslContext =
SslContextUtils.createSslContext(tlsConfiguration);
assertNotNull("SSLContext not found", sslContext);
-
final String identifier = SSLContextService.class.getName();
final SSLContextService sslContextService =
Mockito.mock(SSLContextService.class);
Mockito.when(sslContextService.getIdentifier()).thenReturn(identifier);
@@ -124,28 +123,24 @@ public class TestPutTCP {
runner.addControllerService(identifier, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, identifier);
-
- final SSLServerSocketFactory serverSocketFactory =
sslContext.getServerSocketFactory();
- createTestServer(OUTGOING_MESSAGE_DELIMITER, false,
serverSocketFactory);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER,
false);
+ createTestServer(TCP_SERVER_ADDRESS, port, sslContext);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
- assertServerConnections(1);
}
@Test
public void testRunSuccessServerVariableExpression() throws Exception {
- createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER,
false);
+ createTestServer(TCP_SERVER_ADDRESS, port);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
- assertServerConnections(1);
}
@Test
public void testRunSuccessPruneSenders() throws Exception {
- createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER,
false);
+ createTestServer(TCP_SERVER_ADDRESS, port);
sendTestData(VALID_FILES);
assertTransfers(VALID_FILES.length);
assertMessagesReceived(VALID_FILES);
@@ -156,31 +151,28 @@ public class TestPutTCP {
runner.clearTransferState();
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
- assertServerConnections(2);
}
@Test
public void testRunSuccessMultiCharDelimiter() throws Exception {
- createTestServer(OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
configureProperties(TCP_SERVER_ADDRESS,
OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
+ createTestServer(TCP_SERVER_ADDRESS, port);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
- assertServerConnections(1);
}
@Test
public void testRunSuccessConnectionPerFlowFile() throws Exception {
- createTestServer(OUTGOING_MESSAGE_DELIMITER, true);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER,
true);
+ createTestServer(TCP_SERVER_ADDRESS, port);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
- assertServerConnections(VALID_FILES.length);
}
@Test
public void testRunSuccessConnectionFailure() throws Exception {
- createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER,
false);
+ createTestServer(TCP_SERVER_ADDRESS, port);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
@@ -189,67 +181,64 @@ public class TestPutTCP {
Thread.sleep(500);
runner.assertQueueEmpty();
- createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER,
false);
+ createTestServer(TCP_SERVER_ADDRESS, port);
sendTestData(VALID_FILES);
assertMessagesReceived(VALID_FILES);
- assertServerConnections(1);
}
@Test
public void testRunSuccessEmptyFile() throws Exception {
- createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER,
false);
+ createTestServer(TCP_SERVER_ADDRESS, port);
sendTestData(EMPTY_FILE);
assertTransfers(1);
runner.assertQueueEmpty();
- assertServerConnections(1);
}
@Test
public void testRunSuccessLargeValidFile() throws Exception {
- createTestServer(OUTGOING_MESSAGE_DELIMITER);
configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER,
true);
+ createTestServer(TCP_SERVER_ADDRESS, port);
final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
sendTestData(testData);
assertMessagesReceived(testData);
- assertServerConnections(testData.length);
}
@Test
public void testRunSuccessFiveHundredMessages() throws Exception {
- createTestServer(OUTGOING_MESSAGE_DELIMITER);
+ configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER,
false);
+ createTestServer(TCP_SERVER_ADDRESS, port);
Thread.sleep(1000);
final String[] testData = createContent(VALID_SMALL_FILE_SIZE);
- configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER,
false);
sendTestData(testData, LOAD_TEST_ITERATIONS, LOAD_TEST_THREAD_COUNT);
assertMessagesReceived(testData, LOAD_TEST_ITERATIONS);
- assertServerConnections(1);
}
- private void createTestServer(final String delimiter) throws Exception {
- createTestServer(delimiter, false);
- }
-
- private void createTestServer(final String delimiter, final boolean
closeOnMessageReceived) throws Exception {
- createTestServer(delimiter, closeOnMessageReceived,
ServerSocketFactory.getDefault());
+ private void createTestServer(final String address, final int port, final
SSLContext sslContext) throws Exception {
+ messages = new LinkedBlockingQueue<>();
+ final byte[] delimiter = getDelimiter();
+ NettyEventServerFactory serverFactory = new
ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port,
PROTOCOL, delimiter, VALID_LARGE_FILE_SIZE, messages);
+ if (sslContext != null) {
+ serverFactory.setSslContext(sslContext);
+ }
+ eventServer = serverFactory.getEventServer();
}
- private void createTestServer(final String delimiter, final boolean
closeOnMessageReceived, final ServerSocketFactory serverSocketFactory) throws
Exception {
- server = new TCPTestServer(InetAddress.getByName(TCP_SERVER_ADDRESS),
received, delimiter, closeOnMessageReceived);
- server.startServer(serverSocketFactory);
- port = server.getPort();
+ private void createTestServer(final String address, final int port) throws
Exception {
+ createTestServer(address, port, null);
}
private void shutdownServer() {
- if (server != null) {
- server.shutdown();
+ if (eventServer != null) {
+ eventServer.shutdown();
}
}
private void configureProperties(String host, String
outgoingMessageDelimiter, boolean connectionPerFlowFile) {
runner.setProperty(PutTCP.HOSTNAME, host);
runner.setProperty(PutTCP.PORT, Integer.toString(port));
+
if (outgoingMessageDelimiter != null) {
runner.setProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER,
outgoingMessageDelimiter);
}
@@ -285,23 +274,16 @@ public class TestPutTCP {
private void assertMessagesReceived(final String[] sentData, final int
iterations) throws Exception {
for (int i = 0; i < iterations; i++) {
for (String item : sentData) {
- final List<Byte> message = received.take();
+ final ByteArrayMessage message = messages.take();
assertNotNull(String.format("Message [%d] not found", i),
message);
- final Byte[] messageBytes = new Byte[message.size()];
- assertArrayEquals(item.getBytes(),
ArrayUtils.toPrimitive(message.toArray(messageBytes)));
+ assert(Arrays.asList(sentData).contains(new
String(message.getMessage())));
}
}
runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length *
iterations);
runner.clearTransferState();
- assertNull("Unexpected Message Found", received.poll());
- }
-
- private void assertServerConnections(final int connections) throws
InterruptedException {
- while (server.getTotalConnections() != connections) {
- Thread.sleep(10);
- }
+ assertNull("Unexpected extra messages found", messages.poll());
}
private String[] createContent(final int size) {
@@ -313,4 +295,13 @@ public class TestPutTCP {
return new String[] { new String(content) };
}
+
+ private byte[] getDelimiter() {
+ String delimiter =
runner.getProcessContext().getProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER).getValue();
+ if (delimiter != null) {
+ return delimiter.getBytes();
+ } else {
+ return null;
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
index 813a30a..69e162e 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutUDP.java
@@ -20,10 +20,17 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
@@ -38,7 +45,9 @@ public class TestPutUDP {
private final static String UDP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE
+ "}";
private final static String UNKNOWN_HOST = "fgdsfgsdffd";
private final static String INVALID_IP_ADDRESS = "300.300.300.300";
- private final static int BUFFER_SIZE = 1024;
+ private static final String DELIMITER = "\n";
+ private static final Charset CHARSET = StandardCharsets.UTF_8;
+ private final static int MAX_FRAME_LENGTH = 32800;
private final static int VALID_LARGE_FILE_SIZE = 32768;
private final static int VALID_SMALL_FILE_SIZE = 64;
private final static int INVALID_LARGE_FILE_SIZE = 1000000;
@@ -51,9 +60,12 @@ public class TestPutUDP {
private final static int DEFAULT_TEST_TIMEOUT_PERIOD = 10000;
private final static int LONG_TEST_TIMEOUT_PERIOD = 100000;
- private UDPTestServer server;
private TestRunner runner;
- private ArrayBlockingQueue<DatagramPacket> recvQueue;
+ private int port;
+ private TransportProtocol PROTOCOL = TransportProtocol.UDP;
+ private EventServer eventServer;
+ private BlockingQueue<ByteArrayMessage> messages;
+
// Test Data
private final static String[] EMPTY_FILE = { "" };
@@ -61,15 +73,18 @@ public class TestPutUDP {
@Before
public void setup() throws Exception {
- createTestServer(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE);
runner = TestRunners.newTestRunner(PutUDP.class);
runner.setVariable(SERVER_VARIABLE, UDP_SERVER_ADDRESS);
+ port = NetworkUtils.getAvailableUdpPort();
+ createTestServer(UDP_SERVER_ADDRESS, port, VALID_LARGE_FILE_SIZE);
}
- private void createTestServer(final String address, final int port, final
int recvQueueSize) throws Exception {
- recvQueue = new ArrayBlockingQueue<DatagramPacket>(recvQueueSize);
- server = new UDPTestServer(InetAddress.getByName(address), port,
recvQueue);
- server.startServer();
+ private void createTestServer(final String address, final int port, final
int frameSize) throws Exception {
+ messages = new LinkedBlockingQueue<>();
+ final byte[] delimiter = DELIMITER.getBytes(CHARSET);
+ NettyEventServerFactory serverFactory = new
ByteArrayMessageNettyEventServerFactory(runner.getLogger(), address, port,
PROTOCOL, delimiter, frameSize, messages);
+ serverFactory.setSocketReceiveBuffer(MAX_FRAME_LENGTH);
+ eventServer = serverFactory.getEventServer();
}
@After
@@ -79,22 +94,12 @@ public class TestPutUDP {
}
private void removeTestServer() {
- if (server != null) {
- server.shutdownServer();
- server = null;
+ if (eventServer != null) {
+ eventServer.shutdown();
+ eventServer = null;
}
}
- private byte[] getPacketData(final DatagramPacket packet) {
- final int length = packet.getLength();
- final byte[] packetData = packet.getData();
- final byte[] resizedPacketData = new byte[length];
- for (int i = 0; i < length; i++) {
- resizedPacketData[i] = packetData[i];
- }
- return resizedPacketData;
- }
-
@Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
public void testValidFiles() throws Exception {
configureProperties(UDP_SERVER_ADDRESS, true);
@@ -120,8 +125,8 @@ public class TestPutUDP {
checkInputQueueIsEmpty();
}
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
- public void testlargeValidFile() throws Exception {
+ @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
+ public void testLargeValidFile() throws Exception {
configureProperties(UDP_SERVER_ADDRESS, true);
final String[] testData = createContent(VALID_LARGE_FILE_SIZE);
sendTestData(testData);
@@ -130,19 +135,13 @@ public class TestPutUDP {
}
@Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
- public void testlargeInvalidFile() throws Exception {
+ public void testLargeInvalidFile() throws Exception {
configureProperties(UDP_SERVER_ADDRESS, true);
String[] testData = createContent(INVALID_LARGE_FILE_SIZE);
sendTestData(testData);
checkRelationships(0, testData.length);
checkNoDataReceived();
checkInputQueueIsEmpty();
-
- // Check that the processor recovers and can send the next valid file
- testData = createContent(VALID_LARGE_FILE_SIZE);
- sendTestData(testData);
- checkReceivedAllData(testData);
- checkInputQueueIsEmpty();
}
@Ignore("This test is failing intermittently as documented in NIFI-4288")
@@ -165,16 +164,16 @@ public class TestPutUDP {
checkInputQueueIsEmpty();
}
- @Test(timeout = DEFAULT_TEST_TIMEOUT_PERIOD)
+ @Test(timeout = LONG_TEST_TIMEOUT_PERIOD)
public void testReconfiguration() throws Exception {
configureProperties(UDP_SERVER_ADDRESS, true);
sendTestData(VALID_FILES);
checkReceivedAllData(VALID_FILES);
- reset(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE);
+ reset(UDP_SERVER_ADDRESS, port, MAX_FRAME_LENGTH);
configureProperties(UDP_SERVER_ADDRESS, true);
sendTestData(VALID_FILES);
checkReceivedAllData(VALID_FILES);
- reset(UDP_SERVER_ADDRESS, 0, BUFFER_SIZE);
+ reset(UDP_SERVER_ADDRESS, port, MAX_FRAME_LENGTH);
configureProperties(UDP_SERVER_ADDRESS, true);
sendTestData(VALID_FILES);
checkReceivedAllData(VALID_FILES);
@@ -190,15 +189,17 @@ public class TestPutUDP {
checkInputQueueIsEmpty();
}
- private void reset(final String address, final int port, final int
recvQueueSize) throws Exception {
+ private void reset(final String address, final int port, final int
frameSize) throws Exception {
runner.clearTransferState();
removeTestServer();
- createTestServer(address, port, recvQueueSize);
+ createTestServer(address, port, frameSize);
}
private void configureProperties(final String host, final boolean
expectValid) {
runner.setProperty(PutUDP.HOSTNAME, host);
- runner.setProperty(PutUDP.PORT,
Integer.toString(server.getLocalPort()));
+ runner.setProperty(PutUDP.PORT, Integer.toString(port));
+ runner.setProperty(PutUDP.MAX_SOCKET_SEND_BUFFER_SIZE, "40000B");
+
if (expectValid) {
runner.assertValid();
} else {
@@ -231,7 +232,7 @@ public class TestPutUDP {
private void checkNoDataReceived() throws Exception {
Thread.sleep(DATA_WAIT_PERIOD);
- assertNull(recvQueue.poll());
+ assertNull("Unexpected extra messages found", messages.poll());
}
private void checkInputQueueIsEmpty() {
@@ -244,18 +245,17 @@ public class TestPutUDP {
private void checkReceivedAllData(final String[] sentData, final int
iterations) throws Exception {
// check each sent FlowFile was successfully sent and received.
- for (String item : sentData) {
+ for (String item : sentData) {
for (int i = 0; i < iterations; i++) {
- DatagramPacket packet = recvQueue.take();
+ ByteArrayMessage packet = messages.take();
assertNotNull(packet);
- assertArrayEquals(item.getBytes(), getPacketData(packet));
+ assertArrayEquals(item.getBytes(), packet.getMessage());
}
}
runner.assertTransferCount(PutUDP.REL_SUCCESS, sentData.length *
iterations);
- // Check that we have no unexpected extra data.
- assertNull(recvQueue.poll());
+ assertNull("Unexpected extra messages found", messages.poll());
}
private String[] createContent(final int size) {
@@ -265,6 +265,6 @@ public class TestPutUDP {
content[i] = CONTENT_CHAR;
}
- return new String[] { new String(content) };
+ return new String[] { new String(content).concat("\n") };
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
deleted file mode 100644
index 2549ffc..0000000
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/TCPTestServer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import org.apache.nifi.io.socket.SocketUtils;
-
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import javax.net.ServerSocketFactory;
-
-public class TCPTestServer implements Runnable {
-
- private final InetAddress ipAddress;
- private final String messageDelimiter;
- private final ArrayBlockingQueue<List<Byte>> queue;
- private final AtomicInteger totalConnections = new AtomicInteger();
- private final boolean closeOnMessageReceived;
-
- private volatile ServerSocket serverSocket;
- private volatile Socket connectionSocket;
- private int port;
-
- public TCPTestServer(final InetAddress ipAddress, final
ArrayBlockingQueue<List<Byte>> queue, final String messageDelimiter, final
boolean closeOnMessageReceived) {
- this.ipAddress = ipAddress;
- this.queue = queue;
- this.messageDelimiter = messageDelimiter;
- this.closeOnMessageReceived = closeOnMessageReceived;
- }
-
- public synchronized void startServer(final ServerSocketFactory
serverSocketFactory) throws Exception {
- if (!isServerRunning()) {
- if (serverSocketFactory == null) {
- serverSocket = new ServerSocket(0, 0, ipAddress);
- } else {
- serverSocket = serverSocketFactory.createServerSocket(0, 0,
ipAddress);
- }
- Thread t = new Thread(this);
- t.setName(this.getClass().getSimpleName());
- t.start();
- port = serverSocket.getLocalPort();
- }
- }
-
- public synchronized void shutdown() {
- shutdownConnection();
- shutdownServer();
- }
-
- public int getPort(){
- return port;
- }
-
- private synchronized void shutdownServer() {
- if (isServerRunning()) {
- SocketUtils.closeQuietly(serverSocket);
- }
- }
-
- private synchronized void shutdownConnection() {
- if (isConnected()) {
- SocketUtils.closeQuietly(connectionSocket);
- }
- }
-
- private void storeReceivedMessage(final List<Byte> message) {
- queue.add(message);
- if (closeOnMessageReceived) {
- shutdownConnection();
- }
- }
-
- private boolean isServerRunning() {
- return serverSocket != null && !serverSocket.isClosed();
- }
-
- private boolean isConnected() {
- return connectionSocket != null && !connectionSocket.isClosed();
- }
-
- public int getTotalConnections() {
- return totalConnections.get();
- }
-
- protected boolean isDelimiterPresent(final List<Byte> message) {
- if (messageDelimiter != null && message.size() >=
messageDelimiter.length()) {
- for (int i = 1; i <= messageDelimiter.length(); i++) {
- if (message.get(message.size() - i) ==
messageDelimiter.charAt(messageDelimiter.length() - i)) {
- if (i == messageDelimiter.length()) {
- return true;
- }
- } else {
- break;
- }
- }
- }
- return false;
- }
-
- protected boolean removeDelimiter(final List<Byte> message) {
- if (isDelimiterPresent(message)) {
- final int messageSize = message.size();
- for (int i = 1; i <= messageDelimiter.length(); i++) {
- message.remove(messageSize - i);
- }
- return true;
- }
-
- return false;
- }
-
- @Override
- public void run() {
- try {
- while (isServerRunning()) {
- connectionSocket = serverSocket.accept();
- totalConnections.incrementAndGet();
- final InputStream inputStream =
connectionSocket.getInputStream();
- while (isConnected()) {
- final List<Byte> message = new ArrayList<>();
- while (true) {
- final int c = inputStream.read();
- if (c < 0) {
- if (!message.isEmpty()) {
- storeReceivedMessage(message);
- }
- shutdownConnection();
- break;
- }
-
- message.add((byte) c);
-
- if (removeDelimiter(message)) {
- storeReceivedMessage(message);
- break;
- }
- }
- }
- }
- } catch (Exception e) {
- // Do Nothing
- } finally {
- shutdown();
- }
-
- }
-}