This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 51e7bdf  NIFI-3669 Add SSL Support to CaptureChangeMySQL
51e7bdf is described below

commit 51e7bdf94088215a89845f9fdc22d15597445a2c
Author: exceptionfactory <[email protected]>
AuthorDate: Sat Oct 10 14:30:58 2020 -0400

    NIFI-3669 Add SSL Support to CaptureChangeMySQL
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4594.
---
 .../nifi-cdc-mysql-processors/pom.xml              |  14 +-
 .../cdc/mysql/processors/CaptureChangeMySQL.java   | 150 ++++++++++++++++-----
 .../processors/ssl/BinaryLogSSLSocketFactory.java  |  56 ++++++++
 .../apache/nifi/cdc/mysql/MockBinlogClient.groovy  |   8 ++
 .../mysql/processors/CaptureChangeMySQLTest.groovy |  63 ++++++++-
 .../ssl/BinaryLogSSLSocketFactoryTest.java         |  67 +++++++++
 6 files changed, 313 insertions(+), 45 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
index 5f57275..a0ad68e 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
@@ -45,24 +45,16 @@ language governing permissions and limitations under the 
License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-mock</artifactId>
-            <version>1.14.0-SNAPSHOT</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-distributed-cache-client-service</artifactId>
-            <scope>test</scope>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-ssl-context-service-api</artifactId>
+            <artifactId>nifi-mock</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-ssl-context-service</artifactId>
-            <version>1.14.0-SNAPSHOT</version>
+            <artifactId>nifi-distributed-cache-client-service</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 3800ebb..a7d9e30 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -25,6 +25,38 @@ import com.github.shyiko.mysql.binlog.event.GtidEventData;
 import com.github.shyiko.mysql.binlog.event.QueryEventData;
 import com.github.shyiko.mysql.binlog.event.RotateEventData;
 import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.network.SSLMode;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -60,8 +92,12 @@ import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
 import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
 import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
 import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
+import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.components.state.StateMap;
@@ -78,37 +114,9 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
 
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.DriverManager;
-import java.sql.DriverPropertyInfo;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-import java.util.regex.Pattern;
-
 import static com.github.shyiko.mysql.binlog.event.EventType.DELETE_ROWS;
 import static com.github.shyiko.mysql.binlog.event.EventType.EXT_DELETE_ROWS;
 import static com.github.shyiko.mysql.binlog.event.EventType.EXT_WRITE_ROWS;
@@ -118,7 +126,6 @@ import static 
com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_WRITE_ROWS;
 import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
 import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
 
-
 /**
  * A processor to retrieve Change Data Capture (CDC) events and send them as 
flow files.
  */
@@ -151,6 +158,22 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
     protected static Set<Relationship> relationships;
 
+    private static final AllowableValue SSL_MODE_DISABLED = new 
AllowableValue(SSLMode.DISABLED.toString(),
+            SSLMode.DISABLED.toString(),
+            "Connect without TLS");
+
+    private static final AllowableValue SSL_MODE_PREFERRED = new 
AllowableValue(SSLMode.PREFERRED.toString(),
+            SSLMode.PREFERRED.toString(),
+            "Connect with TLS when server support enabled, otherwise connect 
without TLS");
+
+    private static final AllowableValue SSL_MODE_REQUIRED = new 
AllowableValue(SSLMode.REQUIRED.toString(),
+            SSLMode.REQUIRED.toString(),
+            "Connect with TLS or fail when server support not enabled");
+
+    private static final AllowableValue SSL_MODE_VERIFY_IDENTITY = new 
AllowableValue(SSLMode.VERIFY_IDENTITY.toString(),
+            SSLMode.VERIFY_IDENTITY.toString(),
+            "Connect with TLS or fail when server support not enabled. Verify 
server hostname matches presented X.509 certificate names or fail when not 
matched");
+
     // Properties
     public static final PropertyDescriptor DATABASE_NAME_PATTERN = new 
PropertyDescriptor.Builder()
             .name("capture-change-mysql-db-name-pattern")
@@ -368,6 +391,30 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor SSL_MODE = new 
PropertyDescriptor.Builder()
+            .name("SSL Mode")
+            .displayName("SSL Mode")
+            .description("SSL Mode used when SSL Context Service configured 
supporting certificate verification options")
+            .required(true)
+            .defaultValue(SSLMode.DISABLED.toString())
+            .allowableValues(SSL_MODE_DISABLED,
+                    SSL_MODE_PREFERRED,
+                    SSL_MODE_REQUIRED,
+                    SSL_MODE_VERIFY_IDENTITY)
+            .build();
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .displayName("SSL Context Service")
+            .description("SSL Context Service supporting encrypted socket 
communication")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .dependsOn(SSL_MODE,
+                    SSL_MODE_PREFERRED,
+                    SSL_MODE_REQUIRED,
+                    SSL_MODE_VERIFY_IDENTITY)
+            .build();
+
     private static final List<PropertyDescriptor> propDescriptors;
 
     private volatile ProcessSession currentSession;
@@ -445,6 +492,8 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
         pds.add(INIT_BINLOG_POSITION);
         pds.add(USE_BINLOG_GTID);
         pds.add(INIT_BINLOG_GTID);
+        pds.add(SSL_MODE);
+        pds.add(SSL_CONTEXT_SERVICE);
         propDescriptors = Collections.unmodifiableList(pds);
     }
 
@@ -459,6 +508,31 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
         return propDescriptors;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final Collection<ValidationResult> results = new ArrayList<>();
+
+        final Map<PropertyDescriptor, String> properties = 
validationContext.getProperties();
+
+        final String sslContextServiceProperty = 
properties.get(SSL_CONTEXT_SERVICE);
+        final String sslModeProperty = properties.get(SSL_MODE);
+        if (StringUtils.isBlank(sslModeProperty) || 
SSLMode.DISABLED.toString().equals(sslModeProperty)) {
+            results.add(new ValidationResult.Builder()
+                    .subject(SSL_MODE.getDisplayName())
+                    .valid(true)
+                    .build());
+        } else if (StringUtils.isBlank(sslContextServiceProperty)) {
+            final String explanation = String.format("SSL Context Service is 
required for SSL Mode [%s]", sslModeProperty);
+            results.add(new ValidationResult.Builder()
+                    .subject(SSL_CONTEXT_SERVICE.getDisplayName())
+                    .valid(false)
+                    .explanation(explanation)
+                    .build());
+        }
+
+        return results;
+    }
+
     @OnPrimaryNodeStateChange
     public synchronized void onPrimaryNodeChange(final PrimaryNodeState state) 
throws CDCException {
         if (state == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
@@ -558,6 +632,9 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
             cacheClient = null;
         }
 
+        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE)
+                .asControllerService(SSLContextService.class);
+        final SSLMode sslMode = 
SSLMode.valueOf(context.getProperty(SSL_MODE).getValue());
 
         // Save off MySQL cluster and JDBC driver information, will be used to 
connect for event enrichment as well as for the binlog connector
         try {
@@ -578,7 +655,7 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
 
             Long serverId = 
context.getProperty(SERVER_ID).evaluateAttributeExpressions().asLong();
 
-            connect(hosts, username, password, serverId, 
createEnrichmentConnection, driverLocation, driverName, connectTimeout);
+            connect(hosts, username, password, serverId, 
createEnrichmentConnection, driverLocation, driverName, connectTimeout, 
sslContextService, sslMode);
         } catch (IOException | IllegalStateException e) {
             context.yield();
             binlogClient = null;
@@ -679,7 +756,8 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
     }
 
     protected void connect(List<InetSocketAddress> hosts, String username, 
String password, Long serverId, boolean createEnrichmentConnection,
-                           String driverLocation, String driverName, long 
connectTimeout) throws IOException {
+                           String driverLocation, String driverName, long 
connectTimeout,
+                           final SSLContextService sslContextService, final 
SSLMode sslMode) throws IOException {
 
         int connectionAttempts = 0;
         final int numHosts = hosts.size();
@@ -730,6 +808,13 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
                 binlogClient.setServerId(serverId);
             }
 
+            binlogClient.setSSLMode(sslMode);
+            if (sslContextService != null) {
+                final SSLContext sslContext = 
sslContextService.createContext();
+                final BinaryLogSSLSocketFactory sslSocketFactory = new 
BinaryLogSSLSocketFactory(sslContext.getSocketFactory());
+                binlogClient.setSslSocketFactory(sslSocketFactory);
+            }
+
             try {
                 if (connectTimeout == 0) {
                     connectTimeout = Long.MAX_VALUE;
@@ -1243,4 +1328,5 @@ public class CaptureChangeMySQL extends 
AbstractSessionFactoryProcessor {
         }
 
     }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactory.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactory.java
new file mode 100644
index 0000000..af73f63
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.processors.ssl;
+
+import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
+
+import javax.net.ssl.SSLSocket;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * Binary Log SSLSocketFactory wrapping standard Java SSLSocketFactory
+ */
+public class BinaryLogSSLSocketFactory implements SSLSocketFactory {
+    private static final boolean AUTO_CLOSE_ENABLED = true;
+
+    private final javax.net.ssl.SSLSocketFactory sslSocketFactory;
+
+    public BinaryLogSSLSocketFactory(final javax.net.ssl.SSLSocketFactory 
sslSocketFactory) {
+        this.sslSocketFactory = sslSocketFactory;
+    }
+
+    /**
+     * Create SSL Socket layers provided Socket using Java SSLSocketFactory
+     *
+     * @param socket Socket to be layered
+     * @return SSL Socket
+     * @throws SocketException Thrown when IOException encountered from 
SSLSocketFactory.createSocket()
+     */
+    @Override
+    public SSLSocket createSocket(final Socket socket) throws SocketException {
+        final String hostAddress = socket.getInetAddress().getHostAddress();
+        final int port = socket.getPort();
+        try {
+            return (SSLSocket) sslSocketFactory.createSocket(socket, 
hostAddress, port, AUTO_CLOSE_ENABLED);
+        } catch (final IOException e) {
+            final String message = String.format("Create SSL Socket Failed for 
Host Address [%s] Port [%d]: %s", hostAddress, port, e.getMessage());
+            throw new SocketException(message);
+        }
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
index ef50bfa..494c8f8 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
@@ -18,6 +18,7 @@ package org.apache.nifi.cdc.mysql
 
 import com.github.shyiko.mysql.binlog.BinaryLogClient
 import com.github.shyiko.mysql.binlog.event.Event
+import com.github.shyiko.mysql.binlog.network.SSLSocketFactory
 
 import java.util.concurrent.TimeoutException
 
@@ -38,6 +39,7 @@ class MockBinlogClient extends BinaryLogClient {
     List<BinaryLogClient.EventListener> eventListeners = []
     List<BinaryLogClient.LifecycleListener> lifecycleListeners = []
 
+    SSLSocketFactory sslSocketFactory
 
     MockBinlogClient(String hostname, int port, String username, String 
password) {
         super(hostname, port, username, password)
@@ -91,6 +93,12 @@ class MockBinlogClient extends BinaryLogClient {
         lifecycleListeners.remove lifecycleListener
     }
 
+    @Override
+    void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
+        super.setSslSocketFactory(sslSocketFactory)
+        this.sslSocketFactory = sslSocketFactory
+    }
+
     def sendEvent(Event event) {
         eventListeners.each { it.onEvent(event) }
     }
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index c44cc27..7e6aca0 100644
--- 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -28,10 +28,12 @@ import com.github.shyiko.mysql.binlog.event.RotateEventData
 import com.github.shyiko.mysql.binlog.event.TableMapEventData
 import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData
 import com.github.shyiko.mysql.binlog.event.WriteRowsEventData
+import com.github.shyiko.mysql.binlog.network.SSLMode
 import groovy.json.JsonSlurper
 import org.apache.commons.io.output.WriterOutputStream
 import org.apache.nifi.cdc.mysql.MockBinlogClient
 import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
+import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory
 import org.apache.nifi.components.PropertyDescriptor
 import org.apache.nifi.components.state.Scope
 import org.apache.nifi.controller.AbstractControllerService
@@ -49,6 +51,7 @@ import org.apache.nifi.cdc.event.io.EventWriter
 import org.apache.nifi.processor.exception.ProcessException
 import org.apache.nifi.provenance.ProvenanceEventType
 import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.ssl.SSLContextService
 import org.apache.nifi.state.MockStateManager
 import org.apache.nifi.util.MockComponentLog
 import org.apache.nifi.util.MockControllerServiceInitializationContext
@@ -58,6 +61,7 @@ import org.junit.After
 import org.junit.Before
 import org.junit.Test
 
+import javax.net.ssl.SSLContext
 import java.sql.Connection
 import java.sql.ResultSet
 import java.sql.SQLException
@@ -67,8 +71,10 @@ import java.util.regex.Matcher
 import java.util.regex.Pattern
 
 import static org.junit.Assert.assertEquals
+import static org.junit.Assert.assertNotNull
 import static org.junit.Assert.assertTrue
 import static org.mockito.ArgumentMatchers.anyString
+import static org.mockito.Mockito.doReturn
 import static org.mockito.Mockito.mock
 import static org.mockito.Mockito.when
 
@@ -93,6 +99,59 @@ class CaptureChangeMySQLTest {
     }
 
     @Test
+    void testSslModeDisabledSslContextServiceNotRequired() {
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.SSL_MODE, 
SSLMode.DISABLED.toString())
+        testRunner.assertValid()
+    }
+
+    @Test
+    void testSslModeRequiredSslContextServiceRequired() {
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.SSL_MODE, 
SSLMode.REQUIRED.toString())
+        testRunner.assertNotValid()
+    }
+
+    @Test
+    void testSslModeRequiredSslContextServiceConfigured() {
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        testRunner.setProperty(CaptureChangeMySQL.SSL_MODE, 
SSLMode.REQUIRED.toString())
+
+        def identifier = SSLContextService.class.getName()
+        def sslContextService = mock(SSLContextService.class)
+        when(sslContextService.getIdentifier()).thenReturn(identifier)
+        testRunner.addControllerService(identifier, sslContextService)
+        testRunner.enableControllerService(sslContextService)
+
+        testRunner.setProperty(CaptureChangeMySQL.SSL_CONTEXT_SERVICE, 
identifier)
+        testRunner.assertValid()
+    }
+
+    @Test
+    void testSslModeRequiredSslContextServiceConnected() {
+        testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+        def sslMode = SSLMode.REQUIRED
+        testRunner.setProperty(CaptureChangeMySQL.SSL_MODE, sslMode.toString())
+
+        def sslContext = SSLContext.getDefault()
+        def identifier = SSLContextService.class.getName()
+        def sslContextService = mock(SSLContextService.class)
+        when(sslContextService.getIdentifier()).thenReturn(identifier)
+        doReturn(sslContext).when(sslContextService).createContext()
+
+        testRunner.addControllerService(identifier, sslContextService)
+        testRunner.enableControllerService(sslContextService)
+        testRunner.setProperty(CaptureChangeMySQL.SSL_CONTEXT_SERVICE, 
identifier)
+        testRunner.assertValid()
+
+        testRunner.run()
+        assertEquals("SSL Mode not matched", sslMode, client.getSSLMode())
+        def sslSocketFactory = client.sslSocketFactory
+        assertNotNull('Binary Log SSLSocketFactory not found', 
sslSocketFactory)
+        assertEquals('Binary Log SSLSocketFactory class not matched', 
BinaryLogSSLSocketFactory.class, sslSocketFactory.getClass())
+    }
+
+    @Test
     void testConnectionFailures() throws Exception {
         testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
         testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
@@ -903,8 +962,8 @@ class CaptureChangeMySQLTest {
         
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, 
'-1000', Scope.CLUSTER)
         
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1', Scope.CLUSTER)
 
-        ((CaptureChangeMySQL) testRunner.getProcessor()).clearState();
-        testRunner.stateManager.clear(Scope.CLUSTER);
+        ((CaptureChangeMySQL) testRunner.getProcessor()).clearState()
+        testRunner.stateManager.clear(Scope.CLUSTER)
 
         // Send some events, wait for the State Update Interval, and verify 
the state was set
         testRunner.setProperty(CaptureChangeMySQL.STATE_UPDATE_INTERVAL, '1 
second')
diff --git 
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactoryTest.java
 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactoryTest.java
new file mode 100644
index 0000000..439ec06
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.processors.ssl;
+
+import org.junit.Test;
+
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BinaryLogSSLSocketFactoryTest {
+
+    private static final int PORT = 65000;
+
+    @Test
+    public void testCreateSocket() throws IOException {
+        final SSLSocketFactory sslSocketFactory = (SSLSocketFactory) 
SSLSocketFactory.getDefault();
+        final BinaryLogSSLSocketFactory socketFactory = new 
BinaryLogSSLSocketFactory(sslSocketFactory);
+
+        final Socket socket = mock(Socket.class);
+        when(socket.isConnected()).thenReturn(true);
+        final InetAddress address = InetAddress.getLoopbackAddress();
+        when(socket.getInetAddress()).thenReturn(address);
+        when(socket.getPort()).thenReturn(PORT);
+
+        final SSLSocket sslSocket = socketFactory.createSocket(socket);
+        assertNotNull("SSL Socket not found", sslSocket);
+        assertEquals("Address not matched", address, 
sslSocket.getInetAddress());
+        assertEquals("Port not matched", PORT, sslSocket.getPort());
+    }
+
+    @Test
+    public void testCreateSocketException() {
+        final SSLSocketFactory sslSocketFactory = (SSLSocketFactory) 
SSLSocketFactory.getDefault();
+        final BinaryLogSSLSocketFactory socketFactory = new 
BinaryLogSSLSocketFactory(sslSocketFactory);
+
+        final Socket socket = mock(Socket.class);
+        final InetAddress address = InetAddress.getLoopbackAddress();
+        when(socket.getInetAddress()).thenReturn(address);
+        when(socket.getPort()).thenReturn(PORT);
+
+        assertThrows(IOException.class, () -> 
socketFactory.createSocket(socket));
+    }
+}

Reply via email to