This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.16 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit e40b14e37ff04e7f2f947f78a57fc50184379406 Author: Peter Gyori <[email protected]> AuthorDate: Fri Mar 25 18:36:43 2022 +0100 NIFI-9838: Added Client Certificate attributes to ListenTCPRecord FlowFiles This closes #5908 Signed-off-by: David Handermann <[email protected]> --- .../listen/SSLSocketChannelRecordReader.java | 10 ++++- .../SocketChannelRecordReaderDispatcher.java | 2 +- .../nifi/processors/standard/ListenTCPRecord.java | 43 ++++++++++++++++++++-- .../processors/standard/TestListenTCPRecord.java | 15 ++++++++ 4 files changed, 65 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java index f393419e33..39710a00b7 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java @@ -24,6 +24,8 @@ import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLSession; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; @@ -40,17 +42,20 @@ public class SSLSocketChannelRecordReader implements SocketChannelRecordReader { private final SSLSocketChannel sslSocketChannel; private final RecordReaderFactory readerFactory; private final SocketChannelRecordReaderDispatcher dispatcher; + private final SSLEngine sslEngine; private RecordReader recordReader; public SSLSocketChannelRecordReader(final SocketChannel socketChannel, final SSLSocketChannel sslSocketChannel, final RecordReaderFactory readerFactory, - final SocketChannelRecordReaderDispatcher dispatcher) { + final SocketChannelRecordReaderDispatcher dispatcher, + final SSLEngine sslEngine) { this.socketChannel = socketChannel; this.sslSocketChannel = sslSocketChannel; this.readerFactory = readerFactory; this.dispatcher = dispatcher; + this.sslEngine = sslEngine; } @Override @@ -87,4 +92,7 @@ public class SSLSocketChannelRecordReader implements SocketChannelRecordReader { dispatcher.connectionCompleted(); } + public SSLSession getSession() { + return sslEngine.getSession(); + } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java index 2c7c93a433..48459ce3d3 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReaderDispatcher.java @@ -119,7 +119,7 @@ public class SocketChannelRecordReaderDispatcher implements Runnable, Closeable } final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel); - socketChannelRecordReader = new SSLSocketChannelRecordReader(socketChannel, sslSocketChannel, readerFactory, this); + socketChannelRecordReader = new SSLSocketChannelRecordReader(socketChannel, sslSocketChannel, readerFactory, this, sslEngine); } // queue the SocketChannelRecordReader for processing by the processor diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java index acc936a5a9..bb7a2bfb24 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java @@ -25,6 +25,8 @@ import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.SocketTimeoutException; import java.nio.channels.ServerSocketChannel; +import java.security.cert.Certificate; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -37,6 +39,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; + import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -62,6 +67,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.listen.ListenerProperties; +import org.apache.nifi.record.listen.SSLSocketChannelRecordReader; import org.apache.nifi.record.listen.SocketChannelRecordReader; import org.apache.nifi.record.listen.SocketChannelRecordReaderDispatcher; import org.apache.nifi.security.util.ClientAuth; @@ -87,14 +93,26 @@ import org.apache.nifi.ssl.SSLContextService; "If the read times out, or if any other error is encountered when reading, the connection will be closed, and any records " + "read up to that point will be handled according to the configured Read Error Strategy (Discard or Transfer). In cases where " + "clients are keeping a connection open, the concurrent tasks for the processor should be adjusted to match the Max Number of " + - "TCP Connections allowed, so that there is a task processing each connection.") + "TCP Connections allowed, so that there is a task processing each connection. " + + "The processor can be configured to use an SSL Context Service to only allow secure connections. " + + "When connected clients present certificates for mutual TLS authentication, the Distinguished Names of the client certificate's " + + "issuer and subject are added to the outgoing FlowFiles as attributes. " + + "The processor does not perform authorization based on Distinguished Name values, but since these values " + + "are attached to the outgoing FlowFiles, authorization can be implemented based on these attributes.") @WritesAttributes({ @WritesAttribute(attribute="tcp.sender", description="The host that sent the data."), @WritesAttribute(attribute="tcp.port", description="The port that the processor accepted the connection on."), @WritesAttribute(attribute="record.count", description="The number of records written to the flow file."), - @WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file.") + @WritesAttribute(attribute="mime.type", description="The mime-type of the writer used to write the records to the flow file."), + @WritesAttribute(attribute="client.certificate.issuer.dn", description="For connections using mutual TLS, the Distinguished Name of the " + + "Certificate Authority that issued the client's certificate " + + "is attached to the FlowFile."), + @WritesAttribute(attribute="client.certificate.subject.dn", description="For connections using mutual TLS, the Distinguished Name of the " + + "client certificate's owner (subject) is attached to the FlowFile.") }) public class ListenTCPRecord extends AbstractProcessor { + private static final String CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE = "client.certificate.subject.dn"; + private static final String CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE = "client.certificate.issuer.dn"; static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() .name("port") @@ -199,7 +217,6 @@ public class ListenTCPRecord extends AbstractProcessor { .description("Messages received successfully will be sent out this relationship.") .build(); - static final List<PropertyDescriptor> PROPERTIES; static { final List<PropertyDescriptor> props = new ArrayList<>(); @@ -427,6 +444,7 @@ public class ListenTCPRecord extends AbstractProcessor { attributes.put("tcp.sender", sender); attributes.put("tcp.port", String.valueOf(port)); attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + addClientCertificateAttributes(attributes, socketRecordReader); flowFile = session.putAllAttributes(flowFile, attributes); final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; @@ -460,4 +478,23 @@ public class ListenTCPRecord extends AbstractProcessor { private String getRemoteAddress(final SocketChannelRecordReader socketChannelRecordReader) { return socketChannelRecordReader.getRemoteAddress() == null ? "null" : socketChannelRecordReader.getRemoteAddress().toString(); } + + private void addClientCertificateAttributes(final Map<String, String> attributes, final SocketChannelRecordReader socketRecordReader) + throws SSLPeerUnverifiedException { + if (socketRecordReader instanceof SSLSocketChannelRecordReader) { + SSLSocketChannelRecordReader sslSocketRecordReader = (SSLSocketChannelRecordReader) socketRecordReader; + SSLSession sslSession = sslSocketRecordReader.getSession(); + try { + Certificate[] certificates = sslSession.getPeerCertificates(); + if (certificates.length > 0) { + X509Certificate certificate = (X509Certificate) certificates[0]; + attributes.put(CLIENT_CERTIFICATE_SUBJECT_DN_ATTRIBUTE, certificate.getSubjectDN().toString()); + attributes.put(CLIENT_CERTIFICATE_ISSUER_DN_ATTRIBUTE, certificate.getIssuerDN().toString()); + } + } catch (SSLPeerUnverifiedException peerUnverifiedException) { + getLogger().debug("Remote Peer [{}] not verified: client certificates not provided", + socketRecordReader.getRemoteAddress(), peerUnverifiedException); + } + } + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java index 325b951014..731902d936 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCPRecord.java @@ -172,6 +172,21 @@ public class TestListenTCPRecord { Assert.assertTrue(content.contains("This is a test " + 3)); } + @Test(timeout = TEST_TIMEOUT) + public void testRunSSLClientDNsAddedAsAttributes() throws InitializationException, IOException, InterruptedException { + runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name()); + enableSslContextService(keyStoreSslContext); + + run(1, keyStoreSslContext); + + final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS); + Assert.assertEquals(1, mockFlowFiles.size()); + + final MockFlowFile flowFile = mockFlowFiles.get(0); + flowFile.assertAttributeEquals("client.certificate.subject.dn", "CN=localhost"); + flowFile.assertAttributeEquals("client.certificate.issuer.dn", "CN=localhost"); + } + @Test(timeout = TEST_TIMEOUT) public void testRunClientAuthNone() throws InitializationException, IOException, InterruptedException { runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.NONE.name());
