This is an automated email from the ASF dual-hosted git repository.

greyp pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 19b13b060d NIFI-11889 Added Record-oriented Transmission to PutTCP
19b13b060d is described below

commit 19b13b060dd32a15e0b2be267a67c2c320ae4acd
Author: exceptionfactory <[email protected]>
AuthorDate: Mon Jul 31 15:19:55 2023 -0500

    NIFI-11889 Added Record-oriented Transmission to PutTCP
    
    This closes #7554
    Signed-off-by: Paul Grey <[email protected]>
---
 .../util/put/AbstractPutEventProcessor.java        |  37 +++--
 .../apache/nifi/processors/standard/PutTCP.java    | 163 ++++++++++++++++++---
 .../standard/property/TransmissionStrategy.java    |  52 +++++++
 .../nifi/processors/standard/TestPutTCP.java       | 153 +++++++++++++++++--
 4 files changed, 358 insertions(+), 47 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
index d9b85775a8..f2aa825b2e 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-event-put/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
@@ -54,19 +54,21 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
 
     public static final PropertyDescriptor HOSTNAME = new 
PropertyDescriptor.Builder()
             .name("Hostname")
-            .description("The ip address or hostname of the destination.")
+            .description("Destination hostname or IP address")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .defaultValue("localhost")
             .required(true)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
-    public static final PropertyDescriptor PORT = new PropertyDescriptor
-            .Builder().name("Port")
-            .description("The port on the destination.")
+
+    public static final PropertyDescriptor PORT = new 
PropertyDescriptor.Builder()
+            .name("Port")
+            .description("Destination port number")
             .required(true)
             .addValidator(StandardValidators.PORT_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
+
     public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
             .name("Max Size of Socket Send Buffer")
             .description("The maximum size of the socket send buffer that 
should be used. This is a suggestion to the Operating System " +
@@ -76,8 +78,9 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
             .defaultValue("1 MB")
             .required(true)
             .build();
-    public static final PropertyDescriptor IDLE_EXPIRATION = new 
PropertyDescriptor
-            .Builder().name("Idle Connection Expiration")
+
+    public static final PropertyDescriptor IDLE_EXPIRATION = new 
PropertyDescriptor.Builder()
+            .name("Idle Connection Expiration")
             .description("The amount of time a connection should be held open 
without being used before closing the connection. A value of 0 seconds will 
disable this feature.")
             .required(true)
             .defaultValue("15 seconds")
@@ -89,13 +92,14 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
     // not added to the properties by default since not all processors may 
need them
     public static final AllowableValue TCP_VALUE = new AllowableValue("TCP", 
"TCP");
     public static final AllowableValue UDP_VALUE = new AllowableValue("UDP", 
"UDP");
-    public static final PropertyDescriptor PROTOCOL = new PropertyDescriptor
-            .Builder().name("Protocol")
+    public static final PropertyDescriptor PROTOCOL = new 
PropertyDescriptor.Builder()
+            .name("Protocol")
             .description("The protocol for communication.")
             .required(true)
             .allowableValues(TCP_VALUE, UDP_VALUE)
             .defaultValue(TCP_VALUE.getValue())
             .build();
+
     public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
             .name("Message Delimiter")
             .description("Specifies the delimiter to use for splitting apart 
multiple messages within a single FlowFile. "
@@ -109,6 +113,7 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
+
     public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
             .name("Character Set")
             .description("Specifies the character set of the data being sent.")
@@ -117,6 +122,7 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
+
     public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
             .name("Timeout")
             .description("The timeout for connecting to and communicating with 
the destination. Does not apply to UDP")
@@ -125,6 +131,7 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
+
     public static final PropertyDescriptor OUTGOING_MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
             .name("Outgoing Message Delimiter")
             .description("Specifies the delimiter to use when sending messages 
out over the same TCP stream. The delimiter is appended to each FlowFile 
message "
@@ -135,6 +142,7 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .build();
+
     public static final PropertyDescriptor CONNECTION_PER_FLOWFILE = new 
PropertyDescriptor.Builder()
             .name("Connection Per FlowFile")
             .description("Specifies whether to send each FlowFile's content on 
an individual connection.")
@@ -142,10 +150,10 @@ public abstract class AbstractPutEventProcessor<T> 
extends AbstractSessionFactor
             .defaultValue("false")
             .allowableValues("true", "false")
             .build();
+
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
             .name("SSL Context Service")
-            .description("The Controller Service to use in order to obtain an 
SSL Context. If this property is set, " +
-                    "messages will be sent over a secure connection.")
+            .description("Specifies the SSL Context Service to enable TLS 
socket communication")
             .required(false)
             .identifiesControllerService(SSLContextService.class)
             .build();
@@ -154,6 +162,7 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
             .name("success")
             .description("FlowFiles that are sent successfully to the 
destination are sent out this relationship.")
             .build();
+
     public static final Relationship REL_FAILURE = new Relationship.Builder()
             .name("failure")
             .description("FlowFiles that failed to send to the destination are 
sent out this relationship.")
@@ -414,14 +423,14 @@ public abstract class AbstractPutEventProcessor<T> 
extends AbstractSessionFactor
             }
 
             if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
-                getLogger().info("Completed processing {} but sent 0 
FlowFiles", new Object[] {flowFile});
+                getLogger().info("Completed processing {} but sent 0 
FlowFiles", flowFile);
                 session.transfer(flowFile, REL_SUCCESS);
                 session.commitAsync();
                 return;
             }
 
             if (successfulRanges.isEmpty()) {
-                getLogger().error("Failed to send {}; routing to 'failure'; 
last failure reason reported was {};", new Object[] {flowFile, 
lastFailureReason});
+                getLogger().error("Failed to send {}; routing to 'failure'; 
last failure reason reported was {};", flowFile, lastFailureReason);
                 final FlowFile penalizedFlowFile = session.penalize(flowFile);
                 session.transfer(penalizedFlowFile, REL_FAILURE);
                 session.commitAsync();
@@ -432,7 +441,7 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
                 final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
                 session.getProvenanceReporter().send(flowFile, transitUri, 
"Sent " + successfulRanges.size() + " messages;", transferMillis);
                 session.transfer(flowFile, REL_SUCCESS);
-                getLogger().info("Successfully sent {} messages for {} in {} 
millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
+                getLogger().info("Successfully sent {} messages for {} in {} 
millis", successfulRanges.size(), flowFile, transferMillis);
                 session.commitAsync();
                 return;
             }
@@ -444,7 +453,7 @@ public abstract class AbstractPutEventProcessor<T> extends 
AbstractSessionFactor
             transferRanges(failedRanges, REL_FAILURE);
             session.remove(flowFile);
             getLogger().error("Successfully sent {} messages, but failed to 
send {} messages; the last error received was {}",
-                    new Object[] {successfulRanges.size(), 
failedRanges.size(), lastFailureReason});
+                    successfulRanges.size(), failedRanges.size(), 
lastFailureReason);
             session.commitAsync();
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
index 639d0220ff..cc2c5d1774 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutTCP.java
@@ -19,6 +19,8 @@ package org.apache.nifi.processors.standard;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -33,33 +35,89 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
+import org.apache.nifi.processors.standard.property.TransmissionStrategy;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.util.StopWatch;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
-@CapabilityDescription("The PutTCP processor receives a FlowFile and transmits 
the FlowFile content over a TCP connection to the configured TCP server. "
-        + "By default, the FlowFiles are transmitted over the same TCP 
connection (or pool of TCP connections if multiple input threads are 
configured). "
-        + "To assist the TCP server with determining message boundaries, an 
optional \"Outgoing Message Delimiter\" string can be configured which is 
appended "
-        + "to the end of each FlowFiles content when it is transmitted over 
the TCP connection. An optional \"Connection Per FlowFile\" parameter can be "
-        + "specified to change the behaviour so that each FlowFiles content is 
transmitted over a single TCP connection which is opened when the FlowFile "
-        + "is received and closed after the FlowFile has been sent. This 
option should only be used for low message volume scenarios, otherwise the 
platform " + "may run out of TCP sockets.")
+@CapabilityDescription("Sends serialized FlowFiles or Records over TCP to a 
configurable destination with optional support for TLS")
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @SeeAlso({ListenTCP.class, PutUDP.class})
 @Tags({ "remote", "egress", "put", "tcp" })
 @SupportsBatching
+@WritesAttributes({
+        @WritesAttribute(attribute = PutTCP.RECORD_COUNT_TRANSMITTED, 
description = "Count of records transmitted to configured destination address")
+})
 public class PutTCP extends AbstractPutEventProcessor<InputStream> {
 
+    public static final String RECORD_COUNT_TRANSMITTED = 
"record.count.transmitted";
+
+    static final PropertyDescriptor TRANSMISSION_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("Transmission Strategy")
+            .displayName("Transmission Strategy")
+            .description("Specifies the strategy used for reading input 
FlowFiles and transmitting messages to the destination socket address")
+            .required(true)
+            .allowableValues(TransmissionStrategy.class)
+            .defaultValue(TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
+            .build();
+
+    static final PropertyDescriptor DEPENDENT_CHARSET = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(CHARSET)
+            .dependsOn(TRANSMISSION_STRATEGY, 
TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
+            .build();
+
+    static final PropertyDescriptor DEPENDENT_OUTGOING_MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(OUTGOING_MESSAGE_DELIMITER)
+            .dependsOn(TRANSMISSION_STRATEGY, 
TransmissionStrategy.FLOWFILE_ORIENTED.getValue())
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("Record Reader")
+            .displayName("Record Reader")
+            .description("Specifies the Controller Service to use for reading 
Records from input FlowFiles")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .dependsOn(TRANSMISSION_STRATEGY, 
TransmissionStrategy.RECORD_ORIENTED.getValue())
+            .build();
+
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("Record Writer")
+            .displayName("Record Writer")
+            .description("Specifies the Controller Service to use for writing 
Records to the configured socket address")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .required(true)
+            .dependsOn(TRANSMISSION_STRATEGY, 
TransmissionStrategy.RECORD_ORIENTED.getValue())
+            .build();
+
+    private static final List<PropertyDescriptor> ADDITIONAL_PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            CONNECTION_PER_FLOWFILE,
+            SSL_CONTEXT_SERVICE,
+            TRANSMISSION_STRATEGY,
+            DEPENDENT_OUTGOING_MESSAGE_DELIMITER,
+            DEPENDENT_CHARSET,
+            RECORD_READER,
+            RECORD_WRITER
+    ));
+
     @Override
     protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(CONNECTION_PER_FLOWFILE,
-                OUTGOING_MESSAGE_DELIMITER,
-                TIMEOUT,
-                SSL_CONTEXT_SERVICE,
-                CHARSET);
+        return ADDITIONAL_PROPERTIES;
     }
 
     @Override
@@ -70,22 +128,21 @@ public class PutTCP extends 
AbstractPutEventProcessor<InputStream> {
             return;
         }
 
+        final TransmissionStrategy transmissionStrategy = 
TransmissionStrategy.valueOf(context.getProperty(TRANSMISSION_STRATEGY).getValue());
         final StopWatch stopWatch = new StopWatch(true);
         try {
-            session.read(flowFile, inputStream -> {
-                InputStream inputStreamEvent = inputStream;
+            final int recordCount;
+            if (TransmissionStrategy.RECORD_ORIENTED == transmissionStrategy) {
+                recordCount = sendRecords(context, session, flowFile);
 
-                final String delimiter = getOutgoingMessageDelimiter(context, 
flowFile);
-                if (delimiter != null) {
-                    final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
-                    inputStreamEvent = new DelimitedInputStream(inputStream, 
delimiter.getBytes(charSet));
-                }
-
-                eventSender.sendEvent(inputStreamEvent);
-            });
+            } else {
+                sendFlowFile(context, session, flowFile);
+                recordCount = 0;
+            }
 
-            session.getProvenanceReporter().send(flowFile, transitUri, 
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            session.transfer(flowFile, REL_SUCCESS);
+            final FlowFile processedFlowFile = session.putAttribute(flowFile, 
RECORD_COUNT_TRANSMITTED, Integer.toString(recordCount));
+            session.getProvenanceReporter().send(processedFlowFile, 
transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.transfer(processedFlowFile, REL_SUCCESS);
             session.commitAsync();
         } catch (final Exception e) {
             getLogger().error("Send Failed {}", flowFile, e);
@@ -104,4 +161,64 @@ public class PutTCP extends 
AbstractPutEventProcessor<InputStream> {
     protected NettyEventSenderFactory<InputStream> 
getNettyEventSenderFactory(final String hostname, final int port, final String 
protocol) {
         return new StreamingNettyEventSenderFactory(getLogger(), hostname, 
port, TransportProtocol.TCP);
     }
+
+    private void sendFlowFile(final ProcessContext context, final 
ProcessSession session, final FlowFile flowFile) {
+        session.read(flowFile, inputStream -> {
+            InputStream inputStreamEvent = inputStream;
+
+            final String delimiter = getOutgoingMessageDelimiter(context, 
flowFile);
+            if (delimiter != null) {
+                final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
+                inputStreamEvent = new DelimitedInputStream(inputStream, 
delimiter.getBytes(charSet));
+            }
+
+            eventSender.sendEvent(inputStreamEvent);
+        });
+    }
+
+    private int sendRecords(final ProcessContext context, final ProcessSession 
session, final FlowFile flowFile) {
+        final AtomicInteger recordCount = new AtomicInteger();
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+
+        session.read(flowFile, inputStream -> {
+            try (
+                    RecordReader recordReader = 
readerFactory.createRecordReader(flowFile, inputStream, getLogger());
+                    ReusableByteArrayInputStream eventInputStream = new 
ReusableByteArrayInputStream();
+                    ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream();
+                    RecordSetWriter recordSetWriter = 
writerFactory.createWriter(getLogger(), recordReader.getSchema(), outputStream, 
flowFile)
+            ) {
+                Record record;
+                while ((record = recordReader.nextRecord()) != null) {
+                    recordSetWriter.write(record);
+                    recordSetWriter.flush();
+
+                    final byte[] buffer = outputStream.toByteArray();
+                    eventInputStream.setBuffer(buffer);
+                    eventSender.sendEvent(eventInputStream);
+                    outputStream.reset();
+
+                    recordCount.getAndIncrement();
+                }
+            } catch (final SchemaNotFoundException | MalformedRecordException 
e) {
+                throw new IOException("Record reading failed", e);
+            }
+        });
+
+        return recordCount.get();
+    }
+
+    private static class ReusableByteArrayInputStream extends 
ByteArrayInputStream {
+
+        private ReusableByteArrayInputStream() {
+            super(new byte[0]);
+        }
+
+        private void setBuffer(final byte[] buffer) {
+            this.buf = buffer;
+            this.pos = 0;
+            this.count = buffer.length;
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/property/TransmissionStrategy.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/property/TransmissionStrategy.java
new file mode 100644
index 0000000000..95be35c519
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/property/TransmissionStrategy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.property;
+
+import org.apache.nifi.components.DescribedValue;
+
+/**
+ * Transmission Strategy enumeration of allowable values for component 
Property Descriptors
+ */
+public enum TransmissionStrategy implements DescribedValue {
+    FLOWFILE_ORIENTED("FlowFile-oriented", "Send FlowFile content as a single 
stream"),
+
+    RECORD_ORIENTED("Record-oriented", "Read Records from input FlowFiles and 
send serialized Records as individual messages");
+
+    private final String displayName;
+
+    private final String description;
+
+    TransmissionStrategy(final String displayName, final String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
index 1742855705..56d3414f82 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutTCP.java
@@ -18,15 +18,26 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.nifi.event.transport.EventServer;
-import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
 import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
 import org.apache.nifi.event.transport.configuration.TransportProtocol;
 import org.apache.nifi.event.transport.message.ByteArrayMessage;
 import 
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
 import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processors.standard.property.TransmissionStrategy;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
 import org.apache.nifi.security.util.TlsConfiguration;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.web.util.ssl.SslContextUtils;
@@ -34,28 +45,41 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
 import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.stubbing.Answer;
 
 import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
 
 @Timeout(30)
+@ExtendWith(MockitoExtension.class)
 public class TestPutTCP {
     private final static String TCP_SERVER_ADDRESS = "127.0.0.1";
     private final static String SERVER_VARIABLE = "server.address";
     private final static String TCP_SERVER_ADDRESS_EL = "${" + SERVER_VARIABLE 
+ "}";
-    private final static int MIN_INVALID_PORT = 0;
-    private final static int MIN_VALID_PORT = 1;
-    private final static int MAX_VALID_PORT = 65535;
-    private final static int MAX_INVALID_PORT = 65536;
     private final static int VALID_LARGE_FILE_SIZE = 32768;
     private final static int VALID_SMALL_FILE_SIZE = 64;
     private final static int LOAD_TEST_ITERATIONS = 500;
@@ -68,6 +92,26 @@ public class TestPutTCP {
     private final static String[] EMPTY_FILE = { "" };
     private final static String[] VALID_FILES = { 
"abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", 
"343424222", "!@£$%^&*()_+:|{}[];\\" };
 
+    private static final String WRITER_SERVICE_ID = 
RecordSetWriterFactory.class.getSimpleName();
+
+    private static final String READER_SERVICE_ID = 
RecordReaderFactory.class.getSimpleName();
+
+    private static final String RECORD = String.class.getSimpleName();
+
+    private static final Record NULL_RECORD = null;
+
+    @Mock
+    private RecordSetWriterFactory writerFactory;
+
+    @Mock
+    private RecordReaderFactory readerFactory;
+
+    @Mock
+    private RecordReader recordReader;
+
+    @Mock
+    private Record record;
+
     private EventServer eventServer;
     private TestRunner runner;
     private BlockingQueue<ByteArrayMessage> messages;
@@ -194,6 +238,55 @@ public class TestPutTCP {
         assertMessagesReceived(testData, LOAD_TEST_ITERATIONS);
     }
 
+    @Test
+    void testRunSuccessRecordOriented() throws Exception {
+        createTestServer(OUTGOING_MESSAGE_DELIMITER);
+        runner.setProperty(PutTCP.HOSTNAME, TCP_SERVER_ADDRESS);
+        runner.setProperty(PutTCP.PORT, 
String.valueOf(eventServer.getListeningPort()));
+
+        runner.setProperty(PutTCP.TRANSMISSION_STRATEGY, 
TransmissionStrategy.RECORD_ORIENTED.getValue());
+
+        when(writerFactory.getIdentifier()).thenReturn(WRITER_SERVICE_ID);
+        runner.addControllerService(WRITER_SERVICE_ID, writerFactory);
+        runner.enableControllerService(writerFactory);
+        runner.setProperty(PutTCP.RECORD_WRITER, WRITER_SERVICE_ID);
+
+        when(readerFactory.getIdentifier()).thenReturn(READER_SERVICE_ID);
+        runner.addControllerService(READER_SERVICE_ID, readerFactory);
+        runner.enableControllerService(readerFactory);
+        runner.setProperty(PutTCP.RECORD_READER, READER_SERVICE_ID);
+
+        when(readerFactory.createRecordReader(any(), any(), 
any())).thenReturn(recordReader);
+        when(recordReader.nextRecord()).thenReturn(record, NULL_RECORD);
+        when(writerFactory.createWriter(any(), any(), any(OutputStream.class), 
any(FlowFile.class))).thenAnswer((Answer<RecordSetWriter>) invocationOnMock -> {
+            final OutputStream outputStream = invocationOnMock.getArgument(2, 
OutputStream.class);
+            return new TestPutTCP.MockRecordSetWriter(outputStream);
+        });
+
+        runner.enqueue(RECORD);
+        runner.run();
+
+        runner.assertTransferCount(PutTCP.REL_FAILURE, 0);
+
+        final Iterator<MockFlowFile> successFlowFiles = 
runner.getFlowFilesForRelationship(PutTCP.REL_SUCCESS).iterator();
+        assertTrue(successFlowFiles.hasNext(), "Success FlowFiles not found");
+        final MockFlowFile successFlowFile = successFlowFiles.next();
+        successFlowFile.assertAttributeEquals(PutTCP.RECORD_COUNT_TRANSMITTED, 
Integer.toString(1));
+
+        final List<ProvenanceEventRecord> provenanceEventRecords = 
runner.getProvenanceEvents();
+        final Optional<ProvenanceEventRecord> sendEventFound = 
provenanceEventRecords.stream()
+                .filter(eventRecord -> ProvenanceEventType.SEND == 
eventRecord.getEventType())
+                .findFirst();
+        assertTrue(sendEventFound.isPresent(), "Provenance Send Event not 
found");
+        final ProvenanceEventRecord sendEventRecord = sendEventFound.get();
+        
assertTrue(sendEventRecord.getTransitUri().contains(TCP_SERVER_ADDRESS), 
"Transit URI not matched");
+
+        final ByteArrayMessage message = messages.take();
+        assertNotNull(message);
+        assertArrayEquals(RECORD.getBytes(StandardCharsets.UTF_8), 
message.getMessage());
+        assertEquals(TCP_SERVER_ADDRESS, message.getSender());
+    }
+
     private void createTestServer(final String delimiter) throws 
UnknownHostException {
         createTestServer(null, delimiter);
     }
@@ -206,7 +299,7 @@ public class TestPutTCP {
         if (sslContext != null) {
             serverFactory.setSslContext(sslContext);
         }
-        
serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+        serverFactory.setShutdownQuietPeriod(Duration.ZERO);
         serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
         eventServer = serverFactory.getEventServer();
     }
@@ -270,12 +363,52 @@ public class TestPutTCP {
 
     private String[] createContent(final int size) {
         final char[] content = new char[size];
+        Arrays.fill(content, CONTENT_CHAR);
+        return new String[] { new String(content) };
+    }
+
+    private static class MockRecordSetWriter implements RecordSetWriter {
+        private final OutputStream outputStream;
 
-        for (int i = 0; i < size; i++) {
-            content[i] = CONTENT_CHAR;
+        private MockRecordSetWriter(final OutputStream outputStream) {
+            this.outputStream = outputStream;
         }
 
-        return new String[] { new String(content) };
-    }
+        @Override
+        public WriteResult write(final RecordSet recordSet) {
+            return WriteResult.EMPTY;
+        }
+
+        @Override
+        public void beginRecordSet() {
+
+        }
+
+        @Override
+        public WriteResult finishRecordSet() {
+            return WriteResult.EMPTY;
+        }
 
+        @Override
+        public WriteResult write(Record record) throws IOException {
+            outputStream.write(RECORD.getBytes(StandardCharsets.UTF_8));
+            
outputStream.write(OUTGOING_MESSAGE_DELIMITER.getBytes(StandardCharsets.UTF_8));
+            return WriteResult.of(1, Collections.emptyMap());
+        }
+
+        @Override
+        public String getMimeType() {
+            return null;
+        }
+
+        @Override
+        public void flush() {
+
+        }
+
+        @Override
+        public void close() {
+
+        }
+    }
 }

Reply via email to