This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.19 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 0ed8114dcb05eefecb8f6c1234449f6f1a362d47 Author: Vasily Makarov <[email protected]> AuthorDate: Fri Jun 26 10:44:29 2020 +0300 NIFI-7190 CaptureChangeMySQL processor remove comments from normalized query This closes #6711 Co-authored-by: Vasily Makarov <[email protected]> Co-authored-by: Matt Burgess <[email protected]> Signed-off-by: David Handermann <[email protected]> --- .../cdc/mysql/processors/CaptureChangeMySQL.java | 16 +- .../mysql/processors/CaptureChangeMySQLTest.groovy | 35 ++ .../nifi/cdc/mysql/CaptureChangeMySQLTest.java | 393 --------------------- .../nifi/cdc/mysql/MockBinlogClientJava.java | 108 ------ 4 files changed, 50 insertions(+), 502 deletions(-) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index 3e245c7ab0..08dad6dc40 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -157,6 +157,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { // Random invalid constant used as an indicator to not set the binlog position on the client (thereby using the latest available) private static final int DO_NOT_SET = -1000; + // A regular expression matching multiline comments, used when parsing DDL statements + private static final Pattern MULTI_COMMENT_PATTERN = Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL); + // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -968,7 +971,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { currentTable = null; } else { // Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change - String normalizedQuery = sql.toLowerCase().trim().replaceAll(" {2,}", " "); + String normalizedQuery = normalizeQuery(sql); + if (normalizedQuery.startsWith("alter table") || normalizedQuery.startsWith("alter ignore table") || normalizedQuery.startsWith("create table") @@ -1111,6 +1115,16 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { currentSession.clearState(Scope.CLUSTER); } + protected String normalizeQuery(String sql) { + String normalizedQuery = sql.toLowerCase().trim().replaceAll(" {2,}", " "); + + //Remove comments from the query + normalizedQuery = MULTI_COMMENT_PATTERN.matcher(normalizedQuery).replaceAll("").trim(); + normalizedQuery = normalizedQuery.replaceAll("#.*", ""); + normalizedQuery = normalizedQuery.replaceAll("-{2}.*", ""); + return normalizedQuery; + } + protected void stop() throws CDCException { try { if (binlogClient != null) { diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index 37231e533e..cc03e2aaa6 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -1185,6 +1185,40 @@ class CaptureChangeMySQLTest { ) } + @Test + void testGetXIDEvents() throws Exception { + testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION) + testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306") + testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root") + testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds") + testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true") + final DistributedMapCacheClientImpl cacheClient = createCacheClient() + Map<String, String> clientProperties = new HashMap<>() + clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost") + testRunner.addControllerService("client", cacheClient, clientProperties) + testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client") + testRunner.enableControllerService(cacheClient) + + testRunner.run(1, false, true) + // COMMIT + EventHeaderV4 header2 = new EventHeaderV4() + header2.setEventType(EventType.XID) + header2.setNextPosition(12) + header2.setTimestamp(new Date().getTime()) + EventData eventData = new EventData() { + }; + client.sendEvent(new Event(header2, eventData)); + + // when we ge a xid event without having got a 'begin' event ,throw an exception + assertThrows(AssertionError.class, () -> testRunner.run(1, false, false)) + } + + @Test + void testNormalizeQuery() throws Exception { + assertEquals("alter table", processor.normalizeQuery(" alter table")) + assertEquals("alter table", processor.normalizeQuery(" /* This is a \n multiline comment test */ alter table")) + } + /******************************** * Mock and helper classes below ********************************/ @@ -1224,6 +1258,7 @@ class CaptureChangeMySQLTest { when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet) return mockConnection } + } diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java deleted file mode 100644 index 0d3fd0f257..0000000000 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/CaptureChangeMySQLTest.java +++ /dev/null @@ -1,393 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cdc.mysql; - - -import com.github.shyiko.mysql.binlog.BinaryLogClient; -import com.github.shyiko.mysql.binlog.event.Event; -import com.github.shyiko.mysql.binlog.event.EventData; -import com.github.shyiko.mysql.binlog.event.EventHeaderV4; -import com.github.shyiko.mysql.binlog.event.EventType; -import com.github.shyiko.mysql.binlog.event.QueryEventData; -import com.github.shyiko.mysql.binlog.event.RotateEventData; -import com.github.shyiko.mysql.binlog.network.SSLMode; -import org.apache.commons.io.output.WriterOutputStream; -import org.apache.nifi.cdc.event.ColumnDefinition; -import org.apache.nifi.cdc.event.TableInfo; -import org.apache.nifi.cdc.event.TableInfoCacheKey; -import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.distributed.cache.client.Deserializer; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; -import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.reporting.InitializationException; -import org.apache.nifi.state.MockStateManager; -import org.apache.nifi.util.MockComponentLog; -import org.apache.nifi.util.MockControllerServiceInitializationContext; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.io.StringWriter; -import java.nio.charset.StandardCharsets; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeoutException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class CaptureChangeMySQLTest { - - private static final String DRIVER_LOCATION = "http://mysql-driver.com/driver.jar"; - CaptureChangeMySQL processor; - TestRunner testRunner; - MockBinlogClientJava client = new MockBinlogClientJava("localhost", 3306, "root", "password"); - - @BeforeEach - void setUp() throws Exception { - processor = new MockCaptureChangeMySQL(); - testRunner = TestRunners.newTestRunner(processor); - } - - @Test - void testConnectionFailures() throws Exception { - testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION); - testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306"); - testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root"); - testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, "1"); - final DistributedMapCacheClientImpl cacheClient = createCacheClient(); - Map<String, String> clientProperties = new HashMap<>(); - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); - testRunner.addControllerService("client", cacheClient, clientProperties); - testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client"); - testRunner.enableControllerService(cacheClient); - client.connectionError = true; - try { - testRunner.run(); - } catch (AssertionError ae) { - Throwable pe = ae.getCause(); - assertTrue(pe instanceof ProcessException); - Throwable ioe = pe.getCause(); - assertTrue(ioe instanceof IOException); - assertEquals("Could not connect binlog client to any of the specified hosts due to: Error during connect", ioe.getMessage()); - assertTrue(ioe.getCause() instanceof IOException); - } - client.connectionError = false; - - client.connectionTimeout = true; - try { - testRunner.run(); - } catch (AssertionError ae) { - Throwable pe = ae.getCause(); - assertTrue(pe instanceof ProcessException); - Throwable ioe = pe.getCause(); - assertTrue(ioe instanceof IOException); - assertEquals("Could not connect binlog client to any of the specified hosts due to: Connection timed out", ioe.getMessage()); - assertTrue(ioe.getCause() instanceof TimeoutException); - } - client.connectionTimeout = false; - } - - @Test - void testSslModeDisabledSslContextServiceNotRequired() { - testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306"); - testRunner.setProperty(CaptureChangeMySQL.SSL_MODE, SSLMode.DISABLED.toString()); - testRunner.assertValid(); - } - - @Test - void testGetXIDEvents() throws Exception { - testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION); - testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306"); - testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root"); - testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds"); - testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true"); - final DistributedMapCacheClientImpl cacheClient = createCacheClient(); - Map<String, String> clientProperties = new HashMap<>(); - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); - testRunner.addControllerService("client", cacheClient, clientProperties); - testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client"); - testRunner.enableControllerService(cacheClient); - - testRunner.run(1, false, true); - // COMMIT - EventHeaderV4 header2 = new EventHeaderV4(); - header2.setEventType(EventType.XID); - header2.setNextPosition(12); - header2.setTimestamp(new Date().getTime()); - EventData eventData = new EventData() { - }; - client.sendEvent(new Event(header2, eventData)); - - // when we ge a xid event without having got a 'begin' event ,throw an exception - assertThrows(AssertionError.class, () -> testRunner.run(1, false, false)); - } - - @Test - void testBeginCommitTransaction() throws Exception { - testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION); - testRunner.setProperty(CaptureChangeMySQL.HOSTS, "localhost:3306"); - testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root"); - testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds"); - testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true"); - final DistributedMapCacheClientImpl cacheClient = createCacheClient(); - Map<String, String> clientProperties = new HashMap<>(); - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost"); - testRunner.addControllerService("client", cacheClient, clientProperties); - testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client"); - testRunner.enableControllerService(cacheClient); - - - testRunner.run(1, false, true); - - EventHeaderV4 header = new EventHeaderV4(); - header.setEventType(EventType.ROTATE); - header.setNextPosition(2); - header.setTimestamp(new Date().getTime()); - RotateEventData rotateEventData = new RotateEventData(); - rotateEventData.setBinlogFilename("mysql-bin.000001"); - rotateEventData.setBinlogPosition(4L); - client.sendEvent(new Event(header, rotateEventData)); - - // BEGIN - EventHeaderV4 header1 = new EventHeaderV4(); - header1.setEventType(EventType.QUERY); - header1.setNextPosition(6); - header1.setTimestamp(new Date().getTime()); - QueryEventData rotateEventData1 = new QueryEventData(); - rotateEventData1.setDatabase("mysql-bin.000001"); - rotateEventData1.setDatabase("myDB"); - rotateEventData1.setSql("BEGIN"); - client.sendEvent(new Event(header1, rotateEventData1)); - - // COMMIT - EventHeaderV4 header2 = new EventHeaderV4(); - header2.setEventType(EventType.XID); - header2.setNextPosition(12); - header2.setTimestamp(new Date().getTime()); - EventData eventData2 = new EventData() { - }; - client.sendEvent(new Event(header2, eventData2)); - - //when get a xid event,stop and restart the processor - //here we used to get an exception - testRunner.run(1, true, false); - testRunner.run(1, false, false); - - // next transaction - // BEGIN - EventHeaderV4 header3 = new EventHeaderV4(); - header3.setEventType(EventType.QUERY); - header3.setNextPosition(16); - header3.setTimestamp(new Date().getTime()); - QueryEventData rotateEventData3 = new QueryEventData(); - rotateEventData3.setDatabase("mysql-bin.000001"); - rotateEventData3.setDatabase("myDB"); - rotateEventData3.setSql("BEGIN"); - client.sendEvent(new Event(header3, rotateEventData3)); - - - testRunner.run(1, true, false); - } - - /******************************** - * Mock and helper classes below - ********************************/ - - static DistributedMapCacheClientImpl createCacheClient() throws InitializationException { - - final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl(); - final ComponentLog logger = new MockComponentLog("client", client); - final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager(client)); - - client.initialize(clientInitContext); - - return client; - } - - static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient { - - private final Map<String, String> cacheMap = new HashMap<>(); - - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(DistributedMapCacheClientService.HOSTNAME); - descriptors.add(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT); - descriptors.add(DistributedMapCacheClientService.PORT); - descriptors.add(DistributedMapCacheClientService.SSL_CONTEXT_SERVICE); - return descriptors; - } - - @Override - public <K, V> boolean putIfAbsent( - final K key, - final V value, - final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { - - StringWriter keyWriter = new StringWriter(); - keySerializer.serialize(key, new WriterOutputStream(keyWriter)); - String keyString = keyWriter.toString(); - - if (cacheMap.containsKey(keyString)) return false; - - StringWriter valueWriter = new StringWriter(); - valueSerializer.serialize(value, new WriterOutputStream(valueWriter)); - return true; - } - - @Override - @SuppressWarnings("unchecked") - public <K, V> V getAndPutIfAbsent( - final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, - final Deserializer<V> valueDeserializer) throws IOException { - StringWriter keyWriter = new StringWriter(); - keySerializer.serialize(key, new WriterOutputStream(keyWriter)); - String keyString = keyWriter.toString(); - - if (cacheMap.containsKey(keyString)) - return valueDeserializer.deserialize(cacheMap.get(keyString).getBytes(StandardCharsets.UTF_8)); - - StringWriter valueWriter = new StringWriter(); - valueSerializer.serialize(value, new WriterOutputStream(valueWriter)); - return null; - } - - @Override - public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { - StringWriter keyWriter = new StringWriter(); - keySerializer.serialize(key, new WriterOutputStream(keyWriter)); - String keyString = keyWriter.toString(); - - return cacheMap.containsKey(keyString); - } - - @Override - public <K, V> V get( - final K key, - final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { - StringWriter keyWriter = new StringWriter(); - keySerializer.serialize(key, new WriterOutputStream(keyWriter)); - String keyString = keyWriter.toString(); - - return (cacheMap.containsKey(keyString)) ? valueDeserializer.deserialize(cacheMap.get(keyString).getBytes(StandardCharsets.UTF_8)) : null; - } - - @Override - public void close() throws IOException { - - } - - @Override - public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException { - StringWriter keyWriter = new StringWriter(); - serializer.serialize(key, new WriterOutputStream(keyWriter)); - String keyString = keyWriter.toString(); - - boolean removed = (cacheMap.containsKey(keyString)); - cacheMap.remove(keyString); - return removed; - } - - @Override - public long removeByPattern(String regex) throws IOException { - final List<String> removedRecords = new ArrayList<>(); - Pattern p = Pattern.compile(regex); - for (String key : cacheMap.keySet()) { - // Key must be backed by something that can be converted into a String - Matcher m = p.matcher(key); - if (m.matches()) { - removedRecords.add(cacheMap.get(key)); - } - } - final long numRemoved = removedRecords.size(); - for (String it : removedRecords) { - cacheMap.remove(it); - } - return numRemoved; - } - - @Override - public <K, V> void put( - final K key, - final V value, - final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { - StringWriter keyWriter = new StringWriter(); - keySerializer.serialize(key, new WriterOutputStream(keyWriter)); - StringWriter valueWriter = new StringWriter(); - valueSerializer.serialize(value, new WriterOutputStream(valueWriter)); - } - } - - public class MockCaptureChangeMySQL extends CaptureChangeMySQL { - - Map<TableInfoCacheKey, TableInfo> cache = new HashMap<>(); - - public BinaryLogClient createBinlogClient(String hostname, int port, String username, String password) { - return client; - } - - @Override - public TableInfo loadTableInfo(TableInfoCacheKey key) { - TableInfo tableInfo = cache.get(key); - if (tableInfo == null) { - List<ColumnDefinition> column = new ArrayList<>(); - column.add(new ColumnDefinition((byte) 4, "id")); - column.add(new ColumnDefinition((byte) -4, "string1")); - - tableInfo = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), column); - cache.put(key, tableInfo); - } - return tableInfo; - } - - @Override - protected void registerDriver(String locationString, String drvName) throws InitializationException { - } - - @Override - protected Connection getJdbcConnection() throws SQLException { - Connection mockConnection = mock(Connection.class); - Statement mockStatement = mock(Statement.class); - when(mockConnection.createStatement()).thenReturn(mockStatement); - ResultSet mockResultSet = mock(ResultSet.class); - when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet); - return mockConnection; - } - } -} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java deleted file mode 100644 index d23822292e..0000000000 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/MockBinlogClientJava.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cdc.mysql; - -import com.github.shyiko.mysql.binlog.BinaryLogClient; -import com.github.shyiko.mysql.binlog.event.Event; -import com.github.shyiko.mysql.binlog.network.SSLSocketFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeoutException; - -public class MockBinlogClientJava extends BinaryLogClient { - String hostname; - int port; - String username; - String password; - - boolean connected; - public boolean connectionTimeout = false; - public boolean connectionError = false; - - List<LifecycleListener> lifecycleListeners = new ArrayList<>(); - SSLSocketFactory sslSocketFactory; - - List<EventListener> eventListeners = new ArrayList<>(); - - - public MockBinlogClientJava(String hostname, int port, String username, String password) { - super(hostname, port, username, password); - this.hostname = hostname; - this.port = port; - this.username = username; - this.password = password; - } - - @Override - public void connect(long timeoutInMilliseconds) throws IOException, TimeoutException { - if (connectionTimeout) { - throw new TimeoutException("Connection timed out"); - } - if (connectionError) { - throw new IOException("Error during connect"); - } - if (password == null) { - throw new NullPointerException("Password can't be null"); - } - connected = true; - } - - @Override - public void disconnect() throws IOException { - connected = false; - } - - @Override - public boolean isConnected() { - return connected; - } - - @Override - public void registerEventListener(EventListener eventListener) { - eventListeners.add(eventListener); - } - - public void unregisterEventListener(EventListener eventListener) { - eventListeners.remove(eventListener); - } - - @Override - public void registerLifecycleListener(LifecycleListener lifecycleListener) { - if (!lifecycleListeners.contains(lifecycleListener)) { - lifecycleListeners.add(lifecycleListener); - } - } - - @Override - public void unregisterLifecycleListener(LifecycleListener lifecycleListener) { - lifecycleListeners.remove(lifecycleListener); - } - - @Override - public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) { - super.setSslSocketFactory(sslSocketFactory); - this.sslSocketFactory = sslSocketFactory; - } - - public void sendEvent(Event event) { - for (EventListener eventListener : eventListeners) { - eventListener.onEvent(event); - } - } -}
