This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 51e7bdf NIFI-3669 Add SSL Support to CaptureChangeMySQL
51e7bdf is described below
commit 51e7bdf94088215a89845f9fdc22d15597445a2c
Author: exceptionfactory <[email protected]>
AuthorDate: Sat Oct 10 14:30:58 2020 -0400
NIFI-3669 Add SSL Support to CaptureChangeMySQL
Signed-off-by: Pierre Villard <[email protected]>
This closes #4594.
---
.../nifi-cdc-mysql-processors/pom.xml | 14 +-
.../cdc/mysql/processors/CaptureChangeMySQL.java | 150 ++++++++++++++++-----
.../processors/ssl/BinaryLogSSLSocketFactory.java | 56 ++++++++
.../apache/nifi/cdc/mysql/MockBinlogClient.groovy | 8 ++
.../mysql/processors/CaptureChangeMySQLTest.groovy | 63 ++++++++-
.../ssl/BinaryLogSSLSocketFactoryTest.java | 67 +++++++++
6 files changed, 313 insertions(+), 45 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
index 5f57275..a0ad68e 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
@@ -45,24 +45,16 @@ language governing permissions and limitations under the
License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-mock</artifactId>
- <version>1.14.0-SNAPSHOT</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-distributed-cache-client-service</artifactId>
- <scope>test</scope>
+ <artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-ssl-context-service-api</artifactId>
+ <artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-ssl-context-service</artifactId>
- <version>1.14.0-SNAPSHOT</version>
+ <artifactId>nifi-distributed-cache-client-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 3800ebb..a7d9e30 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -25,6 +25,38 @@ import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
+import com.github.shyiko.mysql.binlog.network.SSLMode;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Logger;
+import java.util.regex.Pattern;
+import javax.net.ssl.SSLContext;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -60,8 +92,12 @@ import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
+import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
@@ -78,37 +114,9 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.sql.Connection;
-import java.sql.Driver;
-import java.sql.DriverManager;
-import java.sql.DriverPropertyInfo;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Logger;
-import java.util.regex.Pattern;
-
import static com.github.shyiko.mysql.binlog.event.EventType.DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.EXT_DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.EXT_WRITE_ROWS;
@@ -118,7 +126,6 @@ import static
com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_WRITE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
-
/**
* A processor to retrieve Change Data Capture (CDC) events and send them as
flow files.
*/
@@ -151,6 +158,22 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
protected static Set<Relationship> relationships;
+ private static final AllowableValue SSL_MODE_DISABLED = new
AllowableValue(SSLMode.DISABLED.toString(),
+ SSLMode.DISABLED.toString(),
+ "Connect without TLS");
+
+ private static final AllowableValue SSL_MODE_PREFERRED = new
AllowableValue(SSLMode.PREFERRED.toString(),
+ SSLMode.PREFERRED.toString(),
+ "Connect with TLS when server support enabled, otherwise connect
without TLS");
+
+ private static final AllowableValue SSL_MODE_REQUIRED = new
AllowableValue(SSLMode.REQUIRED.toString(),
+ SSLMode.REQUIRED.toString(),
+ "Connect with TLS or fail when server support not enabled");
+
+ private static final AllowableValue SSL_MODE_VERIFY_IDENTITY = new
AllowableValue(SSLMode.VERIFY_IDENTITY.toString(),
+ SSLMode.VERIFY_IDENTITY.toString(),
+ "Connect with TLS or fail when server support not enabled. Verify
server hostname matches presented X.509 certificate names or fail when not
matched");
+
// Properties
public static final PropertyDescriptor DATABASE_NAME_PATTERN = new
PropertyDescriptor.Builder()
.name("capture-change-mysql-db-name-pattern")
@@ -368,6 +391,30 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
+ public static final PropertyDescriptor SSL_MODE = new
PropertyDescriptor.Builder()
+ .name("SSL Mode")
+ .displayName("SSL Mode")
+ .description("SSL Mode used when SSL Context Service configured
supporting certificate verification options")
+ .required(true)
+ .defaultValue(SSLMode.DISABLED.toString())
+ .allowableValues(SSL_MODE_DISABLED,
+ SSL_MODE_PREFERRED,
+ SSL_MODE_REQUIRED,
+ SSL_MODE_VERIFY_IDENTITY)
+ .build();
+
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new
PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .displayName("SSL Context Service")
+ .description("SSL Context Service supporting encrypted socket
communication")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .dependsOn(SSL_MODE,
+ SSL_MODE_PREFERRED,
+ SSL_MODE_REQUIRED,
+ SSL_MODE_VERIFY_IDENTITY)
+ .build();
+
private static final List<PropertyDescriptor> propDescriptors;
private volatile ProcessSession currentSession;
@@ -445,6 +492,8 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
pds.add(INIT_BINLOG_POSITION);
pds.add(USE_BINLOG_GTID);
pds.add(INIT_BINLOG_GTID);
+ pds.add(SSL_MODE);
+ pds.add(SSL_CONTEXT_SERVICE);
propDescriptors = Collections.unmodifiableList(pds);
}
@@ -459,6 +508,31 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
return propDescriptors;
}
+ @Override
+ protected Collection<ValidationResult> customValidate(final
ValidationContext validationContext) {
+ final Collection<ValidationResult> results = new ArrayList<>();
+
+ final Map<PropertyDescriptor, String> properties =
validationContext.getProperties();
+
+ final String sslContextServiceProperty =
properties.get(SSL_CONTEXT_SERVICE);
+ final String sslModeProperty = properties.get(SSL_MODE);
+ if (StringUtils.isBlank(sslModeProperty) ||
SSLMode.DISABLED.toString().equals(sslModeProperty)) {
+ results.add(new ValidationResult.Builder()
+ .subject(SSL_MODE.getDisplayName())
+ .valid(true)
+ .build());
+ } else if (StringUtils.isBlank(sslContextServiceProperty)) {
+ final String explanation = String.format("SSL Context Service is
required for SSL Mode [%s]", sslModeProperty);
+ results.add(new ValidationResult.Builder()
+ .subject(SSL_CONTEXT_SERVICE.getDisplayName())
+ .valid(false)
+ .explanation(explanation)
+ .build());
+ }
+
+ return results;
+ }
+
@OnPrimaryNodeStateChange
public synchronized void onPrimaryNodeChange(final PrimaryNodeState state)
throws CDCException {
if (state == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
@@ -558,6 +632,9 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
cacheClient = null;
}
+ final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE)
+ .asControllerService(SSLContextService.class);
+ final SSLMode sslMode =
SSLMode.valueOf(context.getProperty(SSL_MODE).getValue());
// Save off MySQL cluster and JDBC driver information, will be used to
connect for event enrichment as well as for the binlog connector
try {
@@ -578,7 +655,7 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
Long serverId =
context.getProperty(SERVER_ID).evaluateAttributeExpressions().asLong();
- connect(hosts, username, password, serverId,
createEnrichmentConnection, driverLocation, driverName, connectTimeout);
+ connect(hosts, username, password, serverId,
createEnrichmentConnection, driverLocation, driverName, connectTimeout,
sslContextService, sslMode);
} catch (IOException | IllegalStateException e) {
context.yield();
binlogClient = null;
@@ -679,7 +756,8 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
}
protected void connect(List<InetSocketAddress> hosts, String username,
String password, Long serverId, boolean createEnrichmentConnection,
- String driverLocation, String driverName, long
connectTimeout) throws IOException {
+ String driverLocation, String driverName, long
connectTimeout,
+ final SSLContextService sslContextService, final
SSLMode sslMode) throws IOException {
int connectionAttempts = 0;
final int numHosts = hosts.size();
@@ -730,6 +808,13 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
binlogClient.setServerId(serverId);
}
+ binlogClient.setSSLMode(sslMode);
+ if (sslContextService != null) {
+ final SSLContext sslContext =
sslContextService.createContext();
+ final BinaryLogSSLSocketFactory sslSocketFactory = new
BinaryLogSSLSocketFactory(sslContext.getSocketFactory());
+ binlogClient.setSslSocketFactory(sslSocketFactory);
+ }
+
try {
if (connectTimeout == 0) {
connectTimeout = Long.MAX_VALUE;
@@ -1243,4 +1328,5 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
}
}
+
}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactory.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactory.java
new file mode 100644
index 0000000..af73f63
--- /dev/null
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.processors.ssl;
+
+import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
+
+import javax.net.ssl.SSLSocket;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
+
+/**
+ * Binary Log SSLSocketFactory wrapping standard Java SSLSocketFactory
+ */
+public class BinaryLogSSLSocketFactory implements SSLSocketFactory {
+ private static final boolean AUTO_CLOSE_ENABLED = true;
+
+ private final javax.net.ssl.SSLSocketFactory sslSocketFactory;
+
+ public BinaryLogSSLSocketFactory(final javax.net.ssl.SSLSocketFactory
sslSocketFactory) {
+ this.sslSocketFactory = sslSocketFactory;
+ }
+
+ /**
+ * Create SSL Socket layers provided Socket using Java SSLSocketFactory
+ *
+ * @param socket Socket to be layered
+ * @return SSL Socket
+ * @throws SocketException Thrown when IOException encountered from
SSLSocketFactory.createSocket()
+ */
+ @Override
+ public SSLSocket createSocket(final Socket socket) throws SocketException {
+ final String hostAddress = socket.getInetAddress().getHostAddress();
+ final int port = socket.getPort();
+ try {
+ return (SSLSocket) sslSocketFactory.createSocket(socket,
hostAddress, port, AUTO_CLOSE_ENABLED);
+ } catch (final IOException e) {
+ final String message = String.format("Create SSL Socket Failed for
Host Address [%s] Port [%d]: %s", hostAddress, port, e.getMessage());
+ throw new SocketException(message);
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
index ef50bfa..494c8f8 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/MockBinlogClient.groovy
@@ -18,6 +18,7 @@ package org.apache.nifi.cdc.mysql
import com.github.shyiko.mysql.binlog.BinaryLogClient
import com.github.shyiko.mysql.binlog.event.Event
+import com.github.shyiko.mysql.binlog.network.SSLSocketFactory
import java.util.concurrent.TimeoutException
@@ -38,6 +39,7 @@ class MockBinlogClient extends BinaryLogClient {
List<BinaryLogClient.EventListener> eventListeners = []
List<BinaryLogClient.LifecycleListener> lifecycleListeners = []
+ SSLSocketFactory sslSocketFactory
MockBinlogClient(String hostname, int port, String username, String
password) {
super(hostname, port, username, password)
@@ -91,6 +93,12 @@ class MockBinlogClient extends BinaryLogClient {
lifecycleListeners.remove lifecycleListener
}
+ @Override
+ void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
+ super.setSslSocketFactory(sslSocketFactory)
+ this.sslSocketFactory = sslSocketFactory
+ }
+
def sendEvent(Event event) {
eventListeners.each { it.onEvent(event) }
}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index c44cc27..7e6aca0 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -28,10 +28,12 @@ import com.github.shyiko.mysql.binlog.event.RotateEventData
import com.github.shyiko.mysql.binlog.event.TableMapEventData
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData
+import com.github.shyiko.mysql.binlog.network.SSLMode
import groovy.json.JsonSlurper
import org.apache.commons.io.output.WriterOutputStream
import org.apache.nifi.cdc.mysql.MockBinlogClient
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
+import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.components.state.Scope
import org.apache.nifi.controller.AbstractControllerService
@@ -49,6 +51,7 @@ import org.apache.nifi.cdc.event.io.EventWriter
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.ssl.SSLContextService
import org.apache.nifi.state.MockStateManager
import org.apache.nifi.util.MockComponentLog
import org.apache.nifi.util.MockControllerServiceInitializationContext
@@ -58,6 +61,7 @@ import org.junit.After
import org.junit.Before
import org.junit.Test
+import javax.net.ssl.SSLContext
import java.sql.Connection
import java.sql.ResultSet
import java.sql.SQLException
@@ -67,8 +71,10 @@ import java.util.regex.Matcher
import java.util.regex.Pattern
import static org.junit.Assert.assertEquals
+import static org.junit.Assert.assertNotNull
import static org.junit.Assert.assertTrue
import static org.mockito.ArgumentMatchers.anyString
+import static org.mockito.Mockito.doReturn
import static org.mockito.Mockito.mock
import static org.mockito.Mockito.when
@@ -93,6 +99,59 @@ class CaptureChangeMySQLTest {
}
@Test
+ void testSslModeDisabledSslContextServiceNotRequired() {
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.SSL_MODE,
SSLMode.DISABLED.toString())
+ testRunner.assertValid()
+ }
+
+ @Test
+ void testSslModeRequiredSslContextServiceRequired() {
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.SSL_MODE,
SSLMode.REQUIRED.toString())
+ testRunner.assertNotValid()
+ }
+
+ @Test
+ void testSslModeRequiredSslContextServiceConfigured() {
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.SSL_MODE,
SSLMode.REQUIRED.toString())
+
+ def identifier = SSLContextService.class.getName()
+ def sslContextService = mock(SSLContextService.class)
+ when(sslContextService.getIdentifier()).thenReturn(identifier)
+ testRunner.addControllerService(identifier, sslContextService)
+ testRunner.enableControllerService(sslContextService)
+
+ testRunner.setProperty(CaptureChangeMySQL.SSL_CONTEXT_SERVICE,
identifier)
+ testRunner.assertValid()
+ }
+
+ @Test
+ void testSslModeRequiredSslContextServiceConnected() {
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ def sslMode = SSLMode.REQUIRED
+ testRunner.setProperty(CaptureChangeMySQL.SSL_MODE, sslMode.toString())
+
+ def sslContext = SSLContext.getDefault()
+ def identifier = SSLContextService.class.getName()
+ def sslContextService = mock(SSLContextService.class)
+ when(sslContextService.getIdentifier()).thenReturn(identifier)
+ doReturn(sslContext).when(sslContextService).createContext()
+
+ testRunner.addControllerService(identifier, sslContextService)
+ testRunner.enableControllerService(sslContextService)
+ testRunner.setProperty(CaptureChangeMySQL.SSL_CONTEXT_SERVICE,
identifier)
+ testRunner.assertValid()
+
+ testRunner.run()
+ assertEquals("SSL Mode not matched", sslMode, client.getSSLMode())
+ def sslSocketFactory = client.sslSocketFactory
+ assertNotNull('Binary Log SSLSocketFactory not found',
sslSocketFactory)
+ assertEquals('Binary Log SSLSocketFactory class not matched',
BinaryLogSSLSocketFactory.class, sslSocketFactory.getClass())
+ }
+
+ @Test
void testConnectionFailures() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION,
'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
@@ -903,8 +962,8 @@ class CaptureChangeMySQLTest {
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY,
'-1000', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY,
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1', Scope.CLUSTER)
- ((CaptureChangeMySQL) testRunner.getProcessor()).clearState();
- testRunner.stateManager.clear(Scope.CLUSTER);
+ ((CaptureChangeMySQL) testRunner.getProcessor()).clearState()
+ testRunner.stateManager.clear(Scope.CLUSTER)
// Send some events, wait for the State Update Interval, and verify
the state was set
testRunner.setProperty(CaptureChangeMySQL.STATE_UPDATE_INTERVAL, '1
second')
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactoryTest.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactoryTest.java
new file mode 100644
index 0000000..439ec06
--- /dev/null
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/processors/ssl/BinaryLogSSLSocketFactoryTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.processors.ssl;
+
+import org.junit.Test;
+
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BinaryLogSSLSocketFactoryTest {
+
+ private static final int PORT = 65000;
+
+ @Test
+ public void testCreateSocket() throws IOException {
+ final SSLSocketFactory sslSocketFactory = (SSLSocketFactory)
SSLSocketFactory.getDefault();
+ final BinaryLogSSLSocketFactory socketFactory = new
BinaryLogSSLSocketFactory(sslSocketFactory);
+
+ final Socket socket = mock(Socket.class);
+ when(socket.isConnected()).thenReturn(true);
+ final InetAddress address = InetAddress.getLoopbackAddress();
+ when(socket.getInetAddress()).thenReturn(address);
+ when(socket.getPort()).thenReturn(PORT);
+
+ final SSLSocket sslSocket = socketFactory.createSocket(socket);
+ assertNotNull("SSL Socket not found", sslSocket);
+ assertEquals("Address not matched", address,
sslSocket.getInetAddress());
+ assertEquals("Port not matched", PORT, sslSocket.getPort());
+ }
+
+ @Test
+ public void testCreateSocketException() {
+ final SSLSocketFactory sslSocketFactory = (SSLSocketFactory)
SSLSocketFactory.getDefault();
+ final BinaryLogSSLSocketFactory socketFactory = new
BinaryLogSSLSocketFactory(sslSocketFactory);
+
+ final Socket socket = mock(Socket.class);
+ final InetAddress address = InetAddress.getLoopbackAddress();
+ when(socket.getInetAddress()).thenReturn(address);
+ when(socket.getPort()).thenReturn(PORT);
+
+ assertThrows(IOException.class, () ->
socketFactory.createSocket(socket));
+ }
+}