This is an automated email from the ASF dual-hosted git repository.
greyp pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 19b13b060d NIFI-11889 Added Record-oriented Transmission to PutTCP
19b13b060d is described below
commit 19b13b060dd32a15e0b2be267a67c2c320ae4acd
Author: exceptionfactory <[email protected]>
AuthorDate: Mon Jul 31 15:19:55 2023 -0500
NIFI-11889 Added Record-oriented Transmission to PutTCP
This closes #7554
Signed-off-by: Paul Grey <[email protected]>
---
.../util/put/AbstractPutEventProcessor.java | 37 +++--
.../apache/nifi/processors/standard/PutTCP.java | 163 ++++++++++++++++++---
.../standard/property/TransmissionStrategy.java | 52 +++++++
.../nifi/processors/standard/TestPutTCP.java | 153 +++++++++++++++++--
4 files changed, 358 insertions(+), 47 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index d9b85775a8..f2aa825b2e 100644
---
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -54,19 +54,21 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
public static final PropertyDescriptor HOSTNAME = new
PropertyDescriptor.Builder()
.name("Hostname")
- .description("The ip address or hostname of the destination.")
+ .description("Destination hostname or IP address")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("localhost")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
- public static final PropertyDescriptor PORT = new PropertyDescriptor
- .Builder().name("Port")
- .description("The port on the destination.")
+
+ public static final PropertyDescriptor PORT = new
PropertyDescriptor.Builder()
+ .name("Port")
+ .description("Destination port number")
.required(true)
.addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+
public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new
PropertyDescriptor.Builder()
.name("Max Size of Socket Send Buffer")
.description("The maximum size of the socket send buffer that
should be used. This is a suggestion to the Operating System " +
@@ -76,8 +78,9 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
.defaultValue("1 MB")
.required(true)
.build();
- public static final PropertyDescriptor IDLE_EXPIRATION = new
PropertyDescriptor
- .Builder().name("Idle Connection Expiration")
+
+ public static final PropertyDescriptor IDLE_EXPIRATION = new
PropertyDescriptor.Builder()
+ .name("Idle Connection Expiration")
.description("The amount of time a connection should be held open
without being used before closing the connection. A value of 0 seconds will
disable this feature.")
.required(true)
.defaultValue("15 seconds")
@@ -89,13 +92,14 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
// not added to the properties by default since not all processors may
need them
public static final AllowableValue TCP_VALUE = new AllowableValue("TCP",
"TCP");
public static final AllowableValue UDP_VALUE = new AllowableValue("UDP",
"UDP");
- public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
- .Builder().name("Protocol")
+ public static final PropertyDescriptor PROTOCOL = new
PropertyDescriptor.Builder()
+ .name("Protocol")
.description("The protocol for communication.")
.required(true)
.allowableValues(TCP_VALUE, UDP_VALUE)
.defaultValue(TCP_VALUE.getValue())
.build();
+
public static final PropertyDescriptor MESSAGE_DELIMITER = new
PropertyDescriptor.Builder()
.name("Message Delimiter")
.description("Specifies the delimiter to use for splitting apart
multiple messages within a single FlowFile. "
@@ -109,6 +113,7 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
+
public static final PropertyDescriptor CHARSET = new
PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies the character set of the data being sent.")
@@ -117,6 +122,7 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+
public static final PropertyDescriptor TIMEOUT = new
PropertyDescriptor.Builder()
.name("Timeout")
.description("The timeout for connecting to and communicating with
the destination. Does not apply to UDP")
@@ -125,6 +131,7 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+
public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new
PropertyDescriptor.Builder()
.name("Outgoing Message Delimiter")
.description("Specifies the delimiter to use when sending messages
out over the same TCP stream. The delimiter is appended to each FlowFile
message "
@@ -135,6 +142,7 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
+
public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new
PropertyDescriptor.Builder()
.name("Connection Per FlowFile")
.description("Specifies whether to send each FlowFile's content on
an individual connection.")
@@ -142,10 +150,10 @@ public abstract class AbstractPutEventProcessor<T>
extends AbstractSessionFactor
.defaultValue("false")
.allowableValues("true", "false")
.build();
+
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
.name("SSL Context Service")
- .description("The Controller Service to use in order to obtain an
SSL Context. If this property is set, " +
- "messages will be sent over a secure connection.")
+ .description("Specifies the SSL Context Service to enable TLS
socket communication")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
@@ -154,6 +162,7 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
.name("success")
.description("FlowFiles that are sent successfully to the
destination are sent out this relationship.")
.build();
+
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles that failed to send to the destination are
sent out this relationship.")
@@ -414,14 +423,14 @@ public abstract class AbstractPutEventProcessor<T>
extends AbstractSessionFactor
}
if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
- getLogger().info("Completed processing {} but sent 0
FlowFiles", new Object[] {flowFile});
+ getLogger().info("Completed processing {} but sent 0
FlowFiles", flowFile);
session.transfer(flowFile, REL_SUCCESS);
session.commitAsync();
return;
}
if (successfulRanges.isEmpty()) {
- getLogger().error("Failed to send {}; routing to 'failure';
last failure reason reported was {};", new Object[] {flowFile,
lastFailureReason});
+ getLogger().error("Failed to send {}; routing to 'failure';
last failure reason reported was {};", flowFile, lastFailureReason);
final FlowFile penalizedFlowFile = session.penalize(flowFile);
session.transfer(penalizedFlowFile, REL_FAILURE);
session.commitAsync();
@@ -432,7 +441,7 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
session.getProvenanceReporter().send(flowFile, transitUri,
"Sent " + successfulRanges.size() + " messages;", transferMillis);
session.transfer(flowFile, REL_SUCCESS);
- getLogger().info("Successfully sent {} messages for {} in {}
millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
+ getLogger().info("Successfully sent {} messages for {} in {}
millis", successfulRanges.size(), flowFile, transferMillis);
session.commitAsync();
return;
}
@@ -444,7 +453,7 @@ public abstract class AbstractPutEventProcessor<T> extends
AbstractSessionFactor
transferRanges(failedRanges, REL_FAILURE);
session.remove(flowFile);
getLogger().error("Successfully sent {} messages, but failed to
send {} messages; the last error received was {}",
- new Object[] {successfulRanges.size(),
failedRanges.size(), lastFailureReason});
+ successfulRanges.size(), failedRanges.size(),
lastFailureReason);
session.commitAsync();
}
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
index 639d0220ff..cc2c5d1774 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
@@ -19,6 +19,8 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@@ -33,33 +35,89 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
+import org.apache.nifi.processors.standard.property.TransmissionStrategy;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
-@CapabilityDescription("The PutTCP processor receives a FlowFile and transmits
the FlowFile content over a TCP connection to the configured TCP server. "
- + "By default, the FlowFiles are transmitted over the same TCP
connection (or pool of TCP connections if multiple input threads are
configured). "
- + "To assist the TCP server with determining message boundaries, an
optional \"Outgoing Message Delimiter\" string can be configured which is
appended "
- + "to the end of each FlowFiles content when it is transmitted over
the TCP connection. An optional \"Connection Per FlowFile\" parameter can be "
- + "specified to change the behaviour so that each FlowFiles content is
transmitted over a single TCP connection which is opened when the FlowFile "
- + "is received and closed after the FlowFile has been sent. This
option should only be used for low message volume scenarios, otherwise the
platform " + "may run out of TCP sockets.")
+@CapabilityDescription("Sends serialized FlowFiles or Records over TCP to a
configurable destination with optional support for TLS")
@InputRequirement(Requirement.INPUT_REQUIRED)
@SeeAlso({ListenTCP.class, PutUDP.class})
@Tags({ "remote", "egress", "put", "tcp" })
@SupportsBatching
+@WritesAttributes({
+ @WritesAttribute(attribute = PutTCP.RECORD_COUNT_TRANSMITTED,
description = "Count of records transmitted to configured destination address")
+})
public class PutTCP extends AbstractPutEventProcessor<InputStream> {
+ public static final String RECORD_COUNT_TRANSMITTED =
"record.count.transmitted";
+
+ static final PropertyDescriptor TRANSMISSION_STRATEGY = new
PropertyDescriptor.Builder()
+ .name("Transmission Strategy")
+ .displayName("Transmission Strategy")
+ .description("Specifies the strategy used for reading input
FlowFiles and transmitting messages to the destination socket address")
+ .required(true)
+ .allowableValues(TransmissionStrategy.class)
+ .defaultValue(TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
+ .build();
+
+ static final PropertyDescriptor DEPENDENT_CHARSET = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CHARSET)
+ .dependsOn(TRANSMISSION_STRATEGY,
TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
+ .build();
+
+ static final PropertyDescriptor DEPENDENT_OUTGOING_MESSAGE_DELIMITER = new
PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(OUTGOING_MESSAGE_DELIMITER)
+ .dependsOn(TRANSMISSION_STRATEGY,
TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
+ .build();
+
+ static final PropertyDescriptor RECORD_READER = new
PropertyDescriptor.Builder()
+ .name("Record Reader")
+ .displayName("Record Reader")
+ .description("Specifies the Controller Service to use for reading
Records from input FlowFiles")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .required(true)
+ .dependsOn(TRANSMISSION_STRATEGY,
TransmissionStrategy.RECORD_ORIENTED.getValue())
+ .build();
+
+ static final PropertyDescriptor RECORD_WRITER = new
PropertyDescriptor.Builder()
+ .name("Record Writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing
Records to the configured socket address")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .dependsOn(TRANSMISSION_STRATEGY,
TransmissionStrategy.RECORD_ORIENTED.getValue())
+ .build();
+
+ private static final List<PropertyDescriptor> ADDITIONAL_PROPERTIES =
Collections.unmodifiableList(Arrays.asList(
+ CONNECTION_PER_FLOWFILE,
+ SSL_CONTEXT_SERVICE,
+ TRANSMISSION_STRATEGY,
+ DEPENDENT_OUTGOING_MESSAGE_DELIMITER,
+ DEPENDENT_CHARSET,
+ RECORD_READER,
+ RECORD_WRITER
+ ));
+
@Override
protected List<PropertyDescriptor> getAdditionalProperties() {
- return Arrays.asList(CONNECTION_PER_FLOWFILE,
- OUTGOING_MESSAGE_DELIMITER,
- TIMEOUT,
- SSL_CONTEXT_SERVICE,
- CHARSET);
+ return ADDITIONAL_PROPERTIES;
}
@Override
@@ -70,22 +128,21 @@ public class PutTCP extends
AbstractPutEventProcessor<InputStream> {
return;
}
+ final TransmissionStrategy transmissionStrategy =
TransmissionStrategy.valueOf(context.getProperty(TRANSMISSION_STRATEGY).getValue());
final StopWatch stopWatch = new StopWatch(true);
try {
- session.read(flowFile, inputStream -> {
- InputStream inputStreamEvent = inputStream;
+ final int recordCount;
+ if (TransmissionStrategy.RECORD_ORIENTED == transmissionStrategy) {
+ recordCount = sendRecords(context, session, flowFile);
- final String delimiter = getOutgoingMessageDelimiter(context,
flowFile);
- if (delimiter != null) {
- final Charset charSet =
Charset.forName(context.getProperty(CHARSET).getValue());
- inputStreamEvent = new DelimitedInputStream(inputStream,
delimiter.getBytes(charSet));
- }
-
- eventSender.sendEvent(inputStreamEvent);
- });
+ } else {
+ sendFlowFile(context, session, flowFile);
+ recordCount = 0;
+ }
- session.getProvenanceReporter().send(flowFile, transitUri,
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- session.transfer(flowFile, REL_SUCCESS);
+ final FlowFile processedFlowFile = session.putAttribute(flowFile,
RECORD_COUNT_TRANSMITTED, Integer.toString(recordCount));
+ session.getProvenanceReporter().send(processedFlowFile,
transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.transfer(processedFlowFile, REL_SUCCESS);
session.commitAsync();
} catch (final Exception e) {
getLogger().error("Send Failed {}", flowFile, e);
@@ -104,4 +161,64 @@ public class PutTCP extends
AbstractPutEventProcessor<InputStream> {
protected NettyEventSenderFactory<InputStream>
getNettyEventSenderFactory(final String hostname, final int port, final String
protocol) {
return new StreamingNettyEventSenderFactory(getLogger(), hostname,
port, TransportProtocol.TCP);
}
+
+ private void sendFlowFile(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile) {
+ session.read(flowFile, inputStream -> {
+ InputStream inputStreamEvent = inputStream;
+
+ final String delimiter = getOutgoingMessageDelimiter(context,
flowFile);
+ if (delimiter != null) {
+ final Charset charSet =
Charset.forName(context.getProperty(CHARSET).getValue());
+ inputStreamEvent = new DelimitedInputStream(inputStream,
delimiter.getBytes(charSet));
+ }
+
+ eventSender.sendEvent(inputStreamEvent);
+ });
+ }
+
+ private int sendRecords(final ProcessContext context, final ProcessSession
session, final FlowFile flowFile) {
+ final AtomicInteger recordCount = new AtomicInteger();
+
+ final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory writerFactory =
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+ session.read(flowFile, inputStream -> {
+ try (
+ RecordReader recordReader =
readerFactory.createRecordReader(flowFile, inputStream, getLogger());
+ ReusableByteArrayInputStream eventInputStream = new
ReusableByteArrayInputStream();
+ ByteArrayOutputStream outputStream = new
ByteArrayOutputStream();
+ RecordSetWriter recordSetWriter =
writerFactory.createWriter(getLogger(), recordReader.getSchema(), outputStream,
flowFile)
+ ) {
+ Record record;
+ while ((record = recordReader.nextRecord()) != null) {
+ recordSetWriter.write(record);
+ recordSetWriter.flush();
+
+ final byte[] buffer = outputStream.toByteArray();
+ eventInputStream.setBuffer(buffer);
+ eventSender.sendEvent(eventInputStream);
+ outputStream.reset();
+
+ recordCount.getAndIncrement();
+ }
+ } catch (final SchemaNotFoundException | MalformedRecordException
e) {
+ throw new IOException("Record reading failed", e);
+ }
+ });
+
+ return recordCount.get();
+ }
+
+ private static class ReusableByteArrayInputStream extends
ByteArrayInputStream {
+
+ private ReusableByteArrayInputStream() {
+ super(new byte[0]);
+ }
+
+ private void setBuffer(final byte[] buffer) {
+ this.buf = buffer;
+ this.pos = 0;
+ this.count = buffer.length;
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/property/TransmissionStrategy.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/property/TransmissionStrategy.java
new file mode 100644
index 0000000000..95be35c519
--- /dev/null
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/property/TransmissionStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.property;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Transmission Strategy enumeration of allowable values for component
Property Descriptors
+ */
+public enum TransmissionStrategy implements DescribedValue {
+ FLOWFILE_ORIENTED("FlowFile-oriented", "Send FlowFile content as a single
stream"),
+
+ RECORD_ORIENTED("Record-oriented", "Read Records from input FlowFiles and
send serialized Records as individual messages");
+
+ private final String displayName;
+
+ private final String description;
+
+ TransmissionStrategy(final String displayName, final String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
index 1742855705..56d3414f82 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
@@ -18,15 +18,26 @@
package org.apache.nifi.processors.standard;
import org.apache.nifi.event.transport.EventServer;
-import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processors.standard.property.TransmissionStrategy;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
@@ -34,28 +45,41 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.Answer;
import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.OutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
@Timeout(30)
+@ExtendWith(MockitoExtension.class)
public class TestPutTCP {
private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
private final static String SERVER_VARIABLE = "server.address";
private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE
+ "}";
- private final static int MIN_INVALID_PORT = 0;
- private final static int MIN_VALID_PORT = 1;
- private final static int MAX_VALID_PORT = 65535;
- private final static int MAX_INVALID_PORT = 65536;
private final static int VALID_LARGE_FILE_SIZE = 32768;
private final static int VALID_SMALL_FILE_SIZE = 64;
private final static int LOAD_TEST_ITERATIONS = 500;
@@ -68,6 +92,26 @@ public class TestPutTCP {
private final static String[] EMPTY_FILE = { "" };
private final static String[] VALID_FILES = {
"abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678",
"343424222", "!@£$%^&*()_+:|{}[];\\" };
+ private static final String WRITER_SERVICE_ID =
RecordSetWriterFactory.class.getSimpleName();
+
+ private static final String READER_SERVICE_ID =
RecordReaderFactory.class.getSimpleName();
+
+ private static final String RECORD = String.class.getSimpleName();
+
+ private static final Record NULL_RECORD = null;
+
+ @Mock
+ private RecordSetWriterFactory writerFactory;
+
+ @Mock
+ private RecordReaderFactory readerFactory;
+
+ @Mock
+ private RecordReader recordReader;
+
+ @Mock
+ private Record record;
+
private EventServer eventServer;
private TestRunner runner;
private BlockingQueue<ByteArrayMessage> messages;
@@ -194,6 +238,55 @@ public class TestPutTCP {
assertMessagesReceived(testData, LOAD_TEST_ITERATIONS);
}
+ @Test
+ void testRunSuccessRecordOriented() throws Exception {
+ createTestServer(OUTGOING_MESSAGE_DELIMITER);
+ runner.setProperty(PutTCP.HOSTNAME, TCP_SERVER_ADDRESS);
+ runner.setProperty(PutTCP.PORT,
String.valueOf(eventServer.getListeningPort()));
+
+ runner.setProperty(PutTCP.TRANSMISSION_STRATEGY,
TransmissionStrategy.RECORD_ORIENTED.getValue());
+
+ when(writerFactory.getIdentifier()).thenReturn(WRITER_SERVICE_ID);
+ runner.addControllerService(WRITER_SERVICE_ID, writerFactory);
+ runner.enableControllerService(writerFactory);
+ runner.setProperty(PutTCP.RECORD_WRITER, WRITER_SERVICE_ID);
+
+ when(readerFactory.getIdentifier()).thenReturn(READER_SERVICE_ID);
+ runner.addControllerService(READER_SERVICE_ID, readerFactory);
+ runner.enableControllerService(readerFactory);
+ runner.setProperty(PutTCP.RECORD_READER, READER_SERVICE_ID);
+
+ when(readerFactory.createRecordReader(any(), any(),
any())).thenReturn(recordReader);
+ when(recordReader.nextRecord()).thenReturn(record, NULL_RECORD);
+ when(writerFactory.createWriter(any(), any(), any(OutputStream.class),
any(FlowFile.class))).thenAnswer((Answer<RecordSetWriter>) invocationOnMock -> {
+ final OutputStream outputStream = invocationOnMock.getArgument(2,
OutputStream.class);
+ return new TestPutTCP.MockRecordSetWriter(outputStream);
+ });
+
+ runner.enqueue(RECORD);
+ runner.run();
+
+ runner.assertTransferCount(PutTCP.REL_FAILURE, 0);
+
+ final Iterator<MockFlowFile> successFlowFiles =
runner.getFlowFilesForRelationship(PutTCP.REL_SUCCESS).iterator();
+ assertTrue(successFlowFiles.hasNext(), "Success FlowFiles not found");
+ final MockFlowFile successFlowFile = successFlowFiles.next();
+ successFlowFile.assertAttributeEquals(PutTCP.RECORD_COUNT_TRANSMITTED,
Integer.toString(1));
+
+ final List<ProvenanceEventRecord> provenanceEventRecords =
runner.getProvenanceEvents();
+ final Optional<ProvenanceEventRecord> sendEventFound =
provenanceEventRecords.stream()
+ .filter(eventRecord -> ProvenanceEventType.SEND ==
eventRecord.getEventType())
+ .findFirst();
+ assertTrue(sendEventFound.isPresent(), "Provenance Send Event not
found");
+ final ProvenanceEventRecord sendEventRecord = sendEventFound.get();
+
assertTrue(sendEventRecord.getTransitUri().contains(TCP_SERVER_ADDRESS),
"Transit URI not matched");
+
+ final ByteArrayMessage message = messages.take();
+ assertNotNull(message);
+ assertArrayEquals(RECORD.getBytes(StandardCharsets.UTF_8),
message.getMessage());
+ assertEquals(TCP_SERVER_ADDRESS, message.getSender());
+ }
+
private void createTestServer(final String delimiter) throws
UnknownHostException {
createTestServer(null, delimiter);
}
@@ -206,7 +299,7 @@ public class TestPutTCP {
if (sslContext != null) {
serverFactory.setSslContext(sslContext);
}
-
serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+ serverFactory.setShutdownQuietPeriod(Duration.ZERO);
serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
eventServer = serverFactory.getEventServer();
}
@@ -270,12 +363,52 @@ public class TestPutTCP {
private String[] createContent(final int size) {
final char[] content = new char[size];
+ Arrays.fill(content, CONTENT_CHAR);
+ return new String[] { new String(content) };
+ }
+
+ private static class MockRecordSetWriter implements RecordSetWriter {
+ private final OutputStream outputStream;
- for (int i = 0; i < size; i++) {
- content[i] = CONTENT_CHAR;
+ private MockRecordSetWriter(final OutputStream outputStream) {
+ this.outputStream = outputStream;
}
- return new String[] { new String(content) };
- }
+ @Override
+ public WriteResult write(final RecordSet recordSet) {
+ return WriteResult.EMPTY;
+ }
+
+ @Override
+ public void beginRecordSet() {
+
+ }
+
+ @Override
+ public WriteResult finishRecordSet() {
+ return WriteResult.EMPTY;
+ }
+ @Override
+ public WriteResult write(Record record) throws IOException {
+ outputStream.write(RECORD.getBytes(StandardCharsets.UTF_8));
+
outputStream.write(OUTGOING_MESSAGE_DELIMITER.getBytes(StandardCharsets.UTF_8));
+ return WriteResult.of(1, Collections.emptyMap());
+ }
+
+ @Override
+ public String getMimeType() {
+ return null;
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+ }
}