This is an automated email from the ASF dual-hosted git repository.
nsabonyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 78fd7fadcd NIFI-11380: Refactor CaptureChangeMySQL with improvements
78fd7fadcd is described below
commit 78fd7fadcda5948fe8a7847d2c07f83373c63d1c
Author: Matthew Burgess <[email protected]>
AuthorDate: Mon Apr 3 16:14:35 2023 -0400
NIFI-11380: Refactor CaptureChangeMySQL with improvements
This closes #7116
Signed-off-by: Nandor Soma Abonyi <[email protected]>
---
.../java/org/apache/nifi/cdc/event/TableInfo.java | 68 ---
.../apache/nifi/cdc/event/TableInfoCacheKey.java | 21 -
.../nifi-cdc-mysql-processors/pom.xml | 2 +-
.../nifi/cdc/mysql/event/DataCaptureState.java | 82 +++
.../apache/nifi/cdc/mysql/event/MySQLCDCUtils.java | 47 --
.../cdc/mysql/event/handler/BeginEventHandler.java | 50 ++
.../mysql/event/handler/BinlogEventHandler.java | 37 ++
.../mysql/event/handler/CommitEventHandler.java | 63 +++
.../cdc/mysql/event/handler/DDLEventHandler.java | 52 ++
.../mysql/event/handler/DeleteEventHandler.java | 48 ++
.../mysql/event/handler/InsertEventHandler.java | 47 ++
.../mysql/event/handler/UpdateEventHandler.java | 48 ++
.../event/io/AbstractBinlogTableEventWriter.java | 24 +
.../nifi/cdc/mysql/event/io/DeleteRowsWriter.java | 3 +-
.../nifi/cdc/mysql/event/io/InsertRowsWriter.java | 3 +-
.../nifi/cdc/mysql/event/io/UpdateRowsWriter.java | 5 +-
.../cdc/mysql/processors/CaptureChangeMySQL.java | 630 ++++++++++-----------
.../mysql/processors/CaptureChangeMySQLTest.groovy | 209 +------
.../TestInsertRowsWriter.java} | 19 +-
19 files changed, 779 insertions(+), 679 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
index d59306b537..ada962a175 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java
@@ -17,22 +17,14 @@
package org.apache.nifi.cdc.event;
import org.apache.commons.lang3.builder.EqualsBuilder;
-import
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
-import
org.apache.nifi.distributed.cache.client.exception.SerializationException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
/**
* A POJO for holding table information related to update events.
*/
public class TableInfo {
- final static String DB_TABLE_NAME_DELIMITER = "@!@";
-
private String databaseName;
private String tableName;
private Long tableId;
@@ -92,64 +84,4 @@ public class TableInfo {
result = 31 * result + (columns != null ? columns.hashCode() : 0);
return result;
}
-
- public static class Serializer implements
org.apache.nifi.distributed.cache.client.Serializer<TableInfo> {
-
- @Override
- public void serialize(TableInfo value, OutputStream output) throws
SerializationException, IOException {
- StringBuilder sb = new StringBuilder(value.getDatabaseName());
- sb.append(DB_TABLE_NAME_DELIMITER);
- sb.append(value.getTableName());
- sb.append(DB_TABLE_NAME_DELIMITER);
- sb.append(value.getTableId());
- List<ColumnDefinition> columnDefinitions = value.getColumns();
- if (columnDefinitions != null && !columnDefinitions.isEmpty()) {
- sb.append(DB_TABLE_NAME_DELIMITER);
- sb.append(columnDefinitions.stream().map((col) ->
col.getName() + DB_TABLE_NAME_DELIMITER +
col.getType()).collect(Collectors.joining(DB_TABLE_NAME_DELIMITER)));
- }
- output.write(sb.toString().getBytes());
- }
- }
-
- public static class Deserializer implements
org.apache.nifi.distributed.cache.client.Deserializer<TableInfo> {
-
- @Override
- public TableInfo deserialize(byte[] input) throws
DeserializationException, IOException {
- // Don't bother deserializing if empty, just return null. This
usually happens when the key is not found in the cache
- if (input == null || input.length == 0) {
- return null;
- }
- String inputString = new String(input);
- String[] tokens = inputString.split(DB_TABLE_NAME_DELIMITER);
- int numTokens = tokens.length;
- if (numTokens < 3) {
- throw new IOException("Could not deserialize TableInfo from
the following value: " + inputString);
- }
- String dbName = tokens[0];
- String tableName = tokens[1];
- Long tableId;
- try {
- tableId = Long.parseLong(tokens[2]);
- } catch (NumberFormatException nfe) {
- throw new IOException("Illegal table ID: " + tokens[2]);
- }
- // Parse column names and types
- List<ColumnDefinition> columnDefinitions = new ArrayList<>();
- for (int i = 0; i < numTokens - 3; i += 2) {
- try {
- int columnTypeIndex = i + 4;
- int columnNameIndex = i + 3;
- if (columnTypeIndex < numTokens) {
- columnDefinitions.add(new
ColumnDefinition(Integer.parseInt(tokens[columnTypeIndex]),
tokens[columnNameIndex]));
- } else {
- throw new IOException("No type detected for column: "
+ tokens[columnNameIndex]);
- }
- } catch (NumberFormatException nfe) {
- throw new IOException("Illegal column type value for
column " + (i / 2 + 1) + ": " + tokens[i + 4]);
- }
- }
-
- return new TableInfo(dbName, tableName, tableId,
columnDefinitions);
- }
- }
}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
index 2f185e9b7e..81392642bf 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java
@@ -17,12 +17,6 @@
package org.apache.nifi.cdc.event;
import org.apache.commons.lang3.builder.EqualsBuilder;
-import
org.apache.nifi.distributed.cache.client.exception.SerializationException;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import static org.apache.nifi.cdc.event.TableInfo.DB_TABLE_NAME_DELIMITER;
/**
* This class represents a key in a cache that contains information (column
definitions, e.g.) for a database table
@@ -80,19 +74,4 @@ public class TableInfoCacheKey {
public String getUuidPrefix() {
return uuidPrefix;
}
-
- public static class Serializer implements
org.apache.nifi.distributed.cache.client.Serializer<TableInfoCacheKey> {
-
- @Override
- public void serialize(TableInfoCacheKey key, OutputStream output)
throws SerializationException, IOException {
- StringBuilder sb = new StringBuilder(key.getUuidPrefix());
- sb.append(DB_TABLE_NAME_DELIMITER);
- sb.append(key.getDatabaseName());
- sb.append(DB_TABLE_NAME_DELIMITER);
- sb.append(key.getTableName());
- sb.append(DB_TABLE_NAME_DELIMITER);
- sb.append(key.getTableId());
- output.write(sb.toString().getBytes());
- }
- }
}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
index 9dddc26a0c..d8cc4801e7 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml
@@ -41,7 +41,7 @@ language governing permissions and limitations under the
License. -->
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
- <version>0.27.6</version>
+ <version>0.28.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DataCaptureState.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DataCaptureState.java
new file mode 100644
index 0000000000..08ad96c8c8
--- /dev/null
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DataCaptureState.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event;
+
+public class DataCaptureState {
+
+ private String binlogFile = null;
+ private long binlogPosition = 4;
+
+ private boolean useGtid = false;
+ private String gtidSet = null;
+ private long sequenceId = 0L;
+
+ public DataCaptureState() {
+ }
+
+ public DataCaptureState(String binlogFile, long binlogPosition, boolean
useGtid, String gtidSet, long sequenceId) {
+ this.binlogFile = binlogFile;
+ this.binlogPosition = binlogPosition;
+ this.useGtid = useGtid;
+ this.gtidSet = gtidSet;
+ this.sequenceId = sequenceId;
+ }
+
+ public String getBinlogFile() {
+ return binlogFile;
+ }
+
+ public void setBinlogFile(String binlogFile) {
+ this.binlogFile = binlogFile;
+ }
+
+ public long getBinlogPosition() {
+ return binlogPosition;
+ }
+
+ public void setBinlogPosition(long binlogPosition) {
+ this.binlogPosition = binlogPosition;
+ }
+
+ public boolean isUseGtid() {
+ return useGtid;
+ }
+
+ public void setUseGtid(boolean useGtid) {
+ this.useGtid = useGtid;
+ }
+
+ public String getGtidSet() {
+ return gtidSet;
+ }
+
+ public void setGtidSet(String gtidSet) {
+ this.gtidSet = gtidSet;
+ }
+
+ public long getSequenceId() {
+ return sequenceId;
+ }
+
+ public void setSequenceId(long sequenceId) {
+ this.sequenceId = sequenceId;
+ }
+
+ public DataCaptureState copy() {
+ return new DataCaptureState(binlogFile, binlogPosition, useGtid,
gtidSet, sequenceId);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java
deleted file mode 100644
index 313bfce763..0000000000
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.cdc.mysql.event;
-
-import java.io.Serializable;
-
-/**
- * A utility class to provide MySQL- / binlog-specific constants and methods
for processing events and data
- */
-public class MySQLCDCUtils {
-
- public static Object getWritableObject(Integer type, Serializable value) {
- if (value == null) {
- return null;
- }
- if (type == null) {
- if (value instanceof byte[]) {
- return new String((byte[]) value);
- } else if (value instanceof Number) {
- return value;
- }
- } else if (value instanceof Number) {
- return value;
- } else {
- if (value instanceof byte[]) {
- return new String((byte[]) value);
- } else {
- return value.toString();
- }
- }
- return null;
- }
-}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BeginEventHandler.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BeginEventHandler.java
new file mode 100644
index 0000000000..a58319d20f
--- /dev/null
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BeginEventHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class BeginEventHandler implements BinlogEventHandler<QueryEventData,
BeginTransactionEventInfo> {
+
+ private final BeginTransactionEventWriter eventWriter = new
BeginTransactionEventWriter();
+
+ @Override
+ public void handleEvent(final QueryEventData eventData, final boolean
writeEvent, final DataCaptureState dataCaptureState,
+ final CaptureChangeMySQL.BinlogResourceInfo
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+ final String sql, final EventWriterConfiguration
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+ if (writeEvent) {
+ final String currentDatabase = eventData.getDatabase();
+ final BeginTransactionEventInfo beginEvent =
dataCaptureState.isUseGtid()
+ ? new BeginTransactionEventInfo(currentDatabase,
timestamp, dataCaptureState.getGtidSet())
+ : new BeginTransactionEventInfo(currentDatabase,
timestamp, dataCaptureState.getBinlogFile(),
dataCaptureState.getBinlogPosition());
+
+ binlogEventState.setCurrentEventInfo(beginEvent);
+ binlogEventState.setCurrentEventWriter(eventWriter);
+ dataCaptureState.setSequenceId(eventWriter.writeEvent(session,
binlogResourceInfo.getTransitUri(), beginEvent,
dataCaptureState.getSequenceId(),
+ REL_SUCCESS, eventWriterConfiguration));
+ }
+ binlogResourceInfo.setInTransaction(true);
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java
new file mode 100644
index 0000000000..44b8d505e7
--- /dev/null
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.EventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+public interface BinlogEventHandler<T extends EventData, S extends
BinlogEventInfo> {
+
+ void handleEvent(final T eventData,
+ final boolean writeEvent,
+ final DataCaptureState dataCaptureState,
+ final CaptureChangeMySQL.BinlogResourceInfo
binlogResourceInfo,
+ final CaptureChangeMySQL.BinlogEventState
binlogEventState,
+ final String sql,
+ final EventWriterConfiguration eventWriterConfiguration,
+ final ProcessSession session,
+ final long timestamp);
+}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java
new file mode 100644
index 0000000000..14027204ab
--- /dev/null
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.EventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import static
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class CommitEventHandler implements BinlogEventHandler<EventData,
CommitTransactionEventInfo> {
+
+ private final CommitTransactionEventWriter eventWriter = new
CommitTransactionEventWriter();
+
+ @Override
+ public void handleEvent(final EventData eventData, final boolean
writeEvent, final DataCaptureState dataCaptureState,
+ final CaptureChangeMySQL.BinlogResourceInfo
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+ final String sql, final EventWriterConfiguration
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+ final String currentDatabase = binlogResourceInfo.getCurrentDatabase();
+ final CommitTransactionEventInfo commitEvent =
dataCaptureState.isUseGtid()
+ ? new CommitTransactionEventInfo(currentDatabase, timestamp,
dataCaptureState.getGtidSet())
+ : new CommitTransactionEventInfo(currentDatabase, timestamp,
dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition());
+
+ if (writeEvent) {
+ binlogEventState.setCurrentEventInfo(commitEvent);
+ binlogEventState.setCurrentEventWriter(eventWriter);
+ dataCaptureState.setSequenceId(eventWriter.writeEvent(session,
binlogResourceInfo.getTransitUri(), commitEvent,
dataCaptureState.getSequenceId(),
+ REL_SUCCESS, eventWriterConfiguration));
+ } else {
+ // If the COMMIT event is not to be written, the FlowFile should
still be finished and the session committed.
+ if (session != null) {
+ FlowFile flowFile =
eventWriterConfiguration.getCurrentFlowFile();
+ if (flowFile != null) {
+ // Flush the events to the FlowFile when the processor is
stopped
+ eventWriter.finishAndTransferFlowFile(session,
eventWriterConfiguration, binlogResourceInfo.getTransitUri(),
dataCaptureState.getSequenceId(), commitEvent, REL_SUCCESS);
+ }
+ session.commitAsync();
+ }
+ }
+
+ // Update inTransaction value to state
+ binlogResourceInfo.setInTransaction(false);
+ }
+}
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java
new file mode 100644
index 0000000000..698d7fe011
--- /dev/null
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.QueryEventData;
+import org.apache.nifi.cdc.event.TableInfo;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class DDLEventHandler implements BinlogEventHandler<QueryEventData,
DDLEventInfo> {
+
+ private final DDLEventWriter eventWriter = new DDLEventWriter();
+
+ @Override
+ public void handleEvent(final QueryEventData eventData, final boolean
writeEvent, final DataCaptureState dataCaptureState,
+ final CaptureChangeMySQL.BinlogResourceInfo
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+ final String sql, final EventWriterConfiguration
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+ if (writeEvent) {
+ final TableInfo ddlTableInfo =
binlogResourceInfo.getCurrentTable() != null
+ ? binlogResourceInfo.getCurrentTable()
+ : new TableInfo(binlogResourceInfo.getCurrentDatabase(),
null, null, null);
+ final DDLEventInfo ddlEvent = dataCaptureState.isUseGtid()
+ ? new DDLEventInfo(ddlTableInfo, timestamp,
dataCaptureState.getGtidSet(), sql)
+ : new DDLEventInfo(ddlTableInfo, timestamp,
dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), sql);
+
+ binlogEventState.setCurrentEventInfo(ddlEvent);
+ binlogEventState.setCurrentEventWriter(eventWriter);
+ dataCaptureState.setSequenceId(eventWriter.writeEvent(session,
binlogResourceInfo.getTransitUri(), ddlEvent, dataCaptureState.getSequenceId(),
+ REL_SUCCESS, eventWriterConfiguration));
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java
new file mode 100644
index 0000000000..453a779aaf
--- /dev/null
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class DeleteEventHandler implements
BinlogEventHandler<DeleteRowsEventData, DeleteRowsEventInfo> {
+
+ private final DeleteRowsWriter eventWriter = new DeleteRowsWriter();
+
+ @Override
+ public void handleEvent(final DeleteRowsEventData eventData, final boolean
writeEvent, final DataCaptureState dataCaptureState,
+ final CaptureChangeMySQL.BinlogResourceInfo
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+ final String sql, final EventWriterConfiguration
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+ if (writeEvent) {
+ final DeleteRowsEventInfo eventInfo = dataCaptureState.isUseGtid()
+ ? new
DeleteRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp,
dataCaptureState.getGtidSet(), eventData)
+ : new
DeleteRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp,
dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(),
eventData);
+
+ binlogEventState.setCurrentEventInfo(eventInfo);
+ binlogEventState.setCurrentEventWriter(eventWriter);
+ dataCaptureState.setSequenceId(eventWriter.writeEvent(session,
binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(),
+ REL_SUCCESS, eventWriterConfiguration));
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java
new file mode 100644
index 0000000000..0feae1f277
--- /dev/null
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class InsertEventHandler implements
BinlogEventHandler<WriteRowsEventData, InsertRowsEventInfo> {
+
+ private final InsertRowsWriter eventWriter = new InsertRowsWriter();
+ @Override
+ public void handleEvent(final WriteRowsEventData eventData, final boolean
writeEvent, final DataCaptureState dataCaptureState,
+ final CaptureChangeMySQL.BinlogResourceInfo
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+ final String sql, final EventWriterConfiguration
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+ if (writeEvent) {
+ final InsertRowsEventInfo eventInfo = dataCaptureState.isUseGtid()
+ ? new
InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp,
dataCaptureState.getGtidSet(), eventData)
+ : new
InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp,
dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(),
eventData);
+
+ binlogEventState.setCurrentEventInfo(eventInfo);
+ binlogEventState.setCurrentEventWriter(eventWriter);
+ dataCaptureState.setSequenceId(eventWriter.writeEvent(session,
binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(),
+ REL_SUCCESS, eventWriterConfiguration));
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java
new file mode 100644
index 0000000000..2d538af06e
--- /dev/null
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.handler;
+
+import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
+import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
+import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
+import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
+import org.apache.nifi.processor.ProcessSession;
+
+import static
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
+
+public class UpdateEventHandler implements
BinlogEventHandler<UpdateRowsEventData, UpdateRowsEventInfo> {
+
+ private final UpdateRowsWriter eventWriter = new UpdateRowsWriter();
+
+ @Override
+ public void handleEvent(final UpdateRowsEventData eventData, final boolean
writeEvent, final DataCaptureState dataCaptureState,
+ final CaptureChangeMySQL.BinlogResourceInfo
binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
+ final String sql, final EventWriterConfiguration
eventWriterConfiguration, final ProcessSession session, final long timestamp) {
+ if (writeEvent) {
+ final UpdateRowsEventInfo eventInfo = dataCaptureState.isUseGtid()
+ ? new
UpdateRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp,
dataCaptureState.getGtidSet(), eventData)
+ : new
UpdateRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp,
dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(),
eventData);
+
+ binlogEventState.setCurrentEventInfo(eventInfo);
+ binlogEventState.setCurrentEventWriter(eventWriter);
+ dataCaptureState.setSequenceId(eventWriter.writeEvent(session,
binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(),
+ REL_SUCCESS, eventWriterConfiguration));
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
index f618a76a6f..013b63edf9 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java
@@ -19,12 +19,36 @@ package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;
import java.io.IOException;
+import java.io.Serializable;
/**
* An abstract base class for writing MYSQL table-related binlog events into
flow file(s), e.g.
*/
public abstract class AbstractBinlogTableEventWriter<T extends
BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {
+ protected Object getWritableObject(Integer type, Serializable value) {
+ if (value == null) {
+ return null;
+ }
+ if (type == null) {
+ if (value instanceof byte[]) {
+ return new String((byte[]) value);
+ } else if (value instanceof Number) {
+ return value;
+ } else {
+ return null;
+ }
+ } else {
+ if (value instanceof byte[]) {
+ return new String((byte[]) value);
+ } else if (value instanceof Number) {
+ return value;
+ } else {
+ return value.toString();
+ }
+ }
+ }
+
protected void writeJson(T event) throws IOException {
super.writeJson(event);
if (event.getDatabaseName() != null) {
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
index 4828ce8662..8bf4b0220b 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java
@@ -17,7 +17,6 @@
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
-import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
@@ -89,7 +88,7 @@ public class DeleteRowsWriter extends
AbstractBinlogTableEventWriter<DeleteRowsE
if (row[i] == null) {
jsonGenerator.writeNullField("value");
} else {
- jsonGenerator.writeObjectField("value",
MySQLCDCUtils.getWritableObject(columnType, row[i]));
+ jsonGenerator.writeObjectField("value",
getWritableObject(columnType, row[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
index 4453889d53..b4fa31f787 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java
@@ -17,7 +17,6 @@
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
-import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
@@ -90,7 +89,7 @@ public class InsertRowsWriter extends
AbstractBinlogTableEventWriter<InsertRowsE
if (row[i] == null) {
jsonGenerator.writeNullField("value");
} else {
- jsonGenerator.writeObjectField("value",
MySQLCDCUtils.getWritableObject(columnType, row[i]));
+ jsonGenerator.writeObjectField("value",
getWritableObject(columnType, row[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
index 4a9a2d9785..c860ffdaab 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java
@@ -17,7 +17,6 @@
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
-import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
@@ -95,13 +94,13 @@ public class UpdateRowsWriter extends
AbstractBinlogTableEventWriter<UpdateRowsE
if (oldRow[i] == null) {
jsonGenerator.writeNullField("last_value");
} else {
- jsonGenerator.writeObjectField("last_value",
MySQLCDCUtils.getWritableObject(columnType, oldRow[i]));
+ jsonGenerator.writeObjectField("last_value",
getWritableObject(columnType, oldRow[i]));
}
if (newRow[i] == null) {
jsonGenerator.writeNullField("value");
} else {
- jsonGenerator.writeObjectField("value",
MySQLCDCUtils.getWritableObject(columnType, newRow[i]));
+ jsonGenerator.writeObjectField("value",
getWritableObject(columnType, newRow[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
index 76ada9b306..d63ac8bc15 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java
@@ -22,6 +22,7 @@ import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
+import com.github.shyiko.mysql.binlog.event.MySqlGtid;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
@@ -36,7 +37,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
@@ -45,25 +45,20 @@ import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.event.RowEventException;
import org.apache.nifi.cdc.event.TableInfo;
import org.apache.nifi.cdc.event.TableInfoCacheKey;
-import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
import org.apache.nifi.cdc.mysql.event.BinlogEventListener;
import org.apache.nifi.cdc.mysql.event.BinlogLifecycleListener;
-import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
-import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
-import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
+import org.apache.nifi.cdc.mysql.event.DataCaptureState;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy;
-import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
import org.apache.nifi.cdc.mysql.event.RawBinlogEvent;
-import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
-import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
-import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
-import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
-import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
-import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
-import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
+import org.apache.nifi.cdc.mysql.event.handler.BeginEventHandler;
+import org.apache.nifi.cdc.mysql.event.handler.CommitEventHandler;
+import org.apache.nifi.cdc.mysql.event.handler.DDLEventHandler;
+import org.apache.nifi.cdc.mysql.event.handler.DeleteEventHandler;
+import org.apache.nifi.cdc.mysql.event.handler.InsertEventHandler;
+import org.apache.nifi.cdc.mysql.event.handler.UpdateEventHandler;
import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory;
import org.apache.nifi.cdc.mysql.processors.ssl.ConnectionPropertiesProvider;
import
org.apache.nifi.cdc.mysql.processors.ssl.StandardConnectionPropertiesProvider;
@@ -77,9 +72,7 @@ import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
-import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -96,7 +89,6 @@ import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.Driver;
@@ -120,8 +112,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import java.util.regex.Pattern;
@@ -166,6 +156,8 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
// Random invalid constant used as an indicator to not set the binlog
position on the client (thereby using the latest available)
private static final int DO_NOT_SET = -1000;
+ private static final int DEFAULT_MYSQL_PORT = 3306;
+
// A regular expression matching multiline comments, used when parsing DDL
statements
private static final Pattern MULTI_COMMENT_PATTERN =
Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL);
@@ -228,13 +220,14 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
public static final PropertyDescriptor HOSTS = new
PropertyDescriptor.Builder()
.name("capture-change-mysql-hosts")
- .displayName("MySQL Hosts")
- .description("A list of hostname/port entries corresponding to
nodes in a MySQL cluster. The entries should be comma separated "
- + "using a colon such as host1:port,host2:port,.... For
example mysql.myhost.com:3306. This processor will attempt to connect to "
+ .displayName("MySQL Nodes")
+ .description("A list of hostname (and optional port) entries
corresponding to nodes in a MySQL cluster. The entries should be comma
separated "
+ + "using a colon (if the port is to be specified) such as
host1:port,host2:port,.... For example mysql.myhost.com:3306. The port need
not be specified, "
+ + "when omitted the default MySQL port value of 3306 will
be used. This processor will attempt to connect to "
+ "the hosts in the list in order. If one node goes down
and failover is enabled for the cluster, then the processor will connect "
- + "to the active node (assuming its host entry is
specified in this property. The default port for MySQL connections is 3306.")
+ + "to the active node (assuming its node entry is
specified in this property).")
.required(true)
- .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@@ -317,10 +310,8 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
public static final PropertyDescriptor DIST_CACHE_CLIENT = new
PropertyDescriptor.Builder()
.name("capture-change-mysql-dist-map-cache-client")
- .displayName("Distributed Map Cache Client")
- .description("Identifies a Distributed Map Cache Client controller
service to be used for keeping information about the various table columns,
datatypes, etc. "
- + "needed by the processor. If a client is not specified,
the generated events will not include column type or name information (but they
will include database "
- + "and table information.")
+ .displayName("Distributed Map Cache Client - unused")
+ .description("This is a legacy property that is no longer used to
store table information, the processor will handle the table information
(column names, types, etc.)")
.identifiesControllerService(DistributedMapCacheClient.class)
.required(false)
.build();
@@ -464,58 +455,38 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
private static final List<PropertyDescriptor> propDescriptors;
- private volatile ProcessSession currentSession;
- private BinaryLogClient binlogClient;
- private BinlogEventListener eventListener;
- private BinlogLifecycleListener lifecycleListener;
- private GtidSet gtidSet;
+ private volatile BinaryLogClient binlogClient;
+ private volatile BinlogEventListener eventListener;
+ private volatile BinlogLifecycleListener lifecycleListener;
+ private volatile GtidSet gtidSet;
// Set queue capacity to avoid excessive memory consumption
private final BlockingQueue<RawBinlogEvent> queue = new
LinkedBlockingQueue<>(1000);
- private volatile String currentBinlogFile = null;
- private volatile long currentBinlogPosition = 4;
- private volatile String currentGtidSet = null;
-
- // The following variables save the value of the binlog filename,
position, (sequence id), and gtid at the beginning of a transaction. Used for
rollback
- private volatile String xactBinlogFile = null;
- private volatile long xactBinlogPosition = 4;
- private volatile long xactSequenceId = 0;
- private volatile String xactGtidSet = null;
-
- private volatile TableInfo currentTable = null;
- private volatile String currentDatabase = null;
- private volatile Pattern databaseNamePattern;
- private volatile Pattern tableNamePattern;
- private volatile boolean includeBeginCommit = false;
- private volatile boolean includeDDLEvents = false;
- private volatile boolean useGtid = false;
- private volatile boolean inTransaction = false;
- private volatile boolean skipTable = false;
- private final AtomicBoolean hasRun = new AtomicBoolean(false);
+ private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new
HashMap<>();
- private int currentHost = 0;
- private String transitUri = "<unknown>";
+ private volatile ProcessSession currentSession;
+ private DataCaptureState currentDataCaptureState = new DataCaptureState();
- private final AtomicLong currentSequenceId = new AtomicLong(0);
+ private volatile BinlogResourceInfo binlogResourceInfo = new
BinlogResourceInfo();
- private volatile DistributedMapCacheClient cacheClient = null;
- private final Serializer<TableInfoCacheKey> cacheKeySerializer = new
TableInfoCacheKey.Serializer();
- private final Serializer<TableInfo> cacheValueSerializer = new
TableInfo.Serializer();
- private final Deserializer<TableInfo> cacheValueDeserializer = new
TableInfo.Deserializer();
+ private volatile Pattern databaseNamePattern;
+ private volatile Pattern tableNamePattern;
+ private volatile boolean skipTable = false;
+ private int currentHost = 0;
+ private volatile JDBCConnectionHolder jdbcConnectionHolder = null;
- private JDBCConnectionHolder jdbcConnectionHolder = null;
+ private final BinlogEventState binlogEventState = new BinlogEventState();
- private final BeginTransactionEventWriter beginEventWriter = new
BeginTransactionEventWriter();
- private final CommitTransactionEventWriter commitEventWriter = new
CommitTransactionEventWriter();
- private final DDLEventWriter ddlEventWriter = new DDLEventWriter();
- private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
- private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter();
- private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter();
+ private final BeginEventHandler beginEventHandler = new
BeginEventHandler();
+ private final CommitEventHandler commitEventHandler = new
CommitEventHandler();
+ private final DDLEventHandler ddlEventHandler = new DDLEventHandler();
+ private final InsertEventHandler insertEventHandler = new
InsertEventHandler();
+ private final DeleteEventHandler deleteEventHandler = new
DeleteEventHandler();
+ private final UpdateEventHandler updateEventHandler = new
UpdateEventHandler();
private volatile EventWriterConfiguration eventWriterConfiguration;
- private volatile BinlogEventInfo currentEventInfo;
- private AbstractBinlogEventWriter<? extends BinlogEventInfo>
currentEventWriter;
+
static {
@@ -609,6 +580,9 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
return;
}
+ // Get inTransaction value from state
+
binlogResourceInfo.setInTransaction("true".equals(stateMap.get("inTransaction")));
+
// Build a event writer config object for the event writers to use
final FlowFileEventWriteStrategy flowFileEventWriteStrategy =
FlowFileEventWriteStrategy.valueOf(context.getProperty(EVENTS_PER_FLOWFILE_STRATEGY).getValue());
eventWriterConfiguration = new EventWriterConfiguration(
@@ -624,49 +598,52 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
boolean getAllRecords =
context.getProperty(RETRIEVE_ALL_RECORDS).asBoolean();
- includeBeginCommit =
context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
- includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean();
- useGtid = context.getProperty(USE_BINLOG_GTID).asBoolean();
+ if (binlogResourceInfo == null) {
+ binlogResourceInfo = new BinlogResourceInfo();
+ }
+
currentDataCaptureState.setUseGtid(context.getProperty(USE_BINLOG_GTID).asBoolean());
- if (useGtid) {
+ if (currentDataCaptureState.isUseGtid()) {
// Set current gtid to whatever is in State, falling back to the
Retrieve All Records then Initial Gtid if no State variable is present
- currentGtidSet = stateMap.get(BinlogEventInfo.BINLOG_GTIDSET_KEY);
- if (currentGtidSet == null) {
+
currentDataCaptureState.setGtidSet(stateMap.get(BinlogEventInfo.BINLOG_GTIDSET_KEY));
+ if (currentDataCaptureState.getGtidSet() == null) {
if (!getAllRecords &&
context.getProperty(INIT_BINLOG_GTID).isSet()) {
- currentGtidSet =
context.getProperty(INIT_BINLOG_GTID).evaluateAttributeExpressions().getValue();
+
currentDataCaptureState.setGtidSet(context.getProperty(INIT_BINLOG_GTID).evaluateAttributeExpressions().getValue());
} else {
// If we're starting from the beginning of all binlogs,
the binlog gtid must be the empty string (not null)
- currentGtidSet = "";
+ currentDataCaptureState.setGtidSet("");
}
}
- currentBinlogFile = "";
- currentBinlogPosition = DO_NOT_SET;
+ currentDataCaptureState.setBinlogFile("");
+ currentDataCaptureState.setBinlogPosition(DO_NOT_SET);
} else {
// Set current binlog filename to whatever is in State, falling
back to the Retrieve All Records then Initial Binlog Filename if no State
variable is present
- currentBinlogFile =
stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
+ final String currentBinlogFile =
stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
if (currentBinlogFile == null) {
if (!getAllRecords) {
if (context.getProperty(INIT_BINLOG_FILENAME).isSet()) {
- currentBinlogFile =
context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue();
+
currentDataCaptureState.setBinlogFile(context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue());
}
} else {
// If we're starting from the beginning of all binlogs,
the binlog filename must be the empty string (not null)
- currentBinlogFile = "";
+ currentDataCaptureState.setBinlogFile("");
}
+ } else {
+ currentDataCaptureState.setBinlogFile(currentBinlogFile);
}
// Set current binlog position to whatever is in State, falling
back to the Retrieve All Records then Initial Binlog Filename if no State
variable is present
- String binlogPosition =
stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
+ final String binlogPosition =
stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
if (binlogPosition != null) {
- currentBinlogPosition = Long.valueOf(binlogPosition);
+
currentDataCaptureState.setBinlogPosition(Long.parseLong(binlogPosition));
} else if (!getAllRecords) {
if (context.getProperty(INIT_BINLOG_POSITION).isSet()) {
- currentBinlogPosition =
context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong();
+
currentDataCaptureState.setBinlogPosition(context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong());
} else {
- currentBinlogPosition = DO_NOT_SET;
+ currentDataCaptureState.setBinlogPosition(DO_NOT_SET);
}
} else {
- currentBinlogPosition = -1;
+ currentDataCaptureState.setBinlogPosition(-1);
}
}
@@ -676,22 +653,10 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
// Use Initial Sequence ID property if none is found in state
PropertyValue seqIdProp = context.getProperty(INIT_SEQUENCE_ID);
if (seqIdProp.isSet()) {
-
currentSequenceId.set(seqIdProp.evaluateAttributeExpressions().asInteger());
+
currentDataCaptureState.setSequenceId(seqIdProp.evaluateAttributeExpressions().asInteger());
}
} else {
- currentSequenceId.set(Long.parseLong(seqIdString));
- }
- //get inTransaction value from state
- inTransaction = "true".equals(stateMap.get("inTransaction"));
-
- // Get reference to Distributed Cache if one exists. If it does not,
no enrichment (resolution of column names, e.g.) will be performed
- boolean createEnrichmentConnection = false;
- if (context.getProperty(DIST_CACHE_CLIENT).isSet()) {
- cacheClient =
context.getProperty(DIST_CACHE_CLIENT).asControllerService(DistributedMapCacheClient.class);
- createEnrichmentConnection = true;
- } else {
- logger.warn("No Distributed Map Cache Client is specified, so no
event enrichment (resolution of column names, e.g.) will be performed.");
- cacheClient = null;
+ currentDataCaptureState.setSequenceId(Long.parseLong(seqIdString));
}
final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE)
@@ -717,7 +682,7 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
Long serverId =
context.getProperty(SERVER_ID).evaluateAttributeExpressions().asLong();
- connect(hosts, username, password, serverId,
createEnrichmentConnection, driverLocation, driverName, connectTimeout,
sslContextService, sslMode);
+ connect(hosts, username, password, serverId, driverLocation,
driverName, connectTimeout, sslContextService, sslMode);
} catch (IOException | IllegalStateException e) {
if (eventListener != null) {
eventListener.stop();
@@ -735,8 +700,6 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
@Override
public synchronized void onTrigger(ProcessContext context,
ProcessSessionFactory sessionFactory) throws ProcessException {
- // Indicate that this processor has executed at least once, so we know
whether or not the state values are valid and should be updated
- hasRun.set(true);
ComponentLog log = getLogger();
// Create a client if we don't have one
@@ -773,16 +736,12 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
}
try {
- outputEvents(currentSession, log);
+ outputEvents(currentSession, context, log);
} catch (Exception eventException) {
- getLogger().error("Exception during event processing at file={}
pos={}", currentBinlogFile, currentBinlogPosition, eventException);
+ getLogger().error("Exception during event processing at file={}
pos={}", currentDataCaptureState.getBinlogFile(),
currentDataCaptureState.getBinlogPosition(), eventException);
try {
// Perform some processor-level "rollback", then rollback the
session
- currentBinlogFile = xactBinlogFile == null ? "" :
xactBinlogFile;
- currentBinlogPosition = xactBinlogPosition;
- currentSequenceId.set(xactSequenceId);
- currentGtidSet = xactGtidSet;
- inTransaction = false;
+ binlogResourceInfo.setInTransaction(false);
stop();
} catch (Exception e) {
// Not much we can recover from here
@@ -795,16 +754,6 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
}
}
- @OnStopped
- @OnShutdown
- public void onStopped(ProcessContext context) {
- try {
- stop();
- } catch (CDCException ioe) {
- throw new ProcessException(ioe);
- }
- }
-
/**
* Get a list of hosts from a NiFi property, e.g.
*
@@ -821,16 +770,19 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
for (String item : hostsSplit) {
String[] addresses = item.split(":");
- if (addresses.length != 2) {
+ if (addresses.length > 2 || addresses.length == 0) {
throw new ArrayIndexOutOfBoundsException("Not in host:port
format");
+ } else if (addresses.length > 1) {
+ hostsList.add(new InetSocketAddress(addresses[0].trim(),
Integer.parseInt(addresses[1].trim())));
+ } else {
+ // Assume default port of 3306
+ hostsList.add(new InetSocketAddress(addresses[0].trim(),
DEFAULT_MYSQL_PORT));
}
-
- hostsList.add(new InetSocketAddress(addresses[0].trim(),
Integer.parseInt(addresses[1].trim())));
}
return hostsList;
}
- protected void connect(List<InetSocketAddress> hosts, String username,
String password, Long serverId, boolean createEnrichmentConnection,
+ protected void connect(List<InetSocketAddress> hosts, String username,
String password, Long serverId,
String driverLocation, String driverName, long
connectTimeout,
final SSLContextService sslContextService, final
SSLMode sslMode) throws IOException {
@@ -839,17 +791,15 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
InetSocketAddress connectedHost = null;
Exception lastConnectException = new Exception("Unknown connection
error");
- if (createEnrichmentConnection) {
- try {
- // Ensure driverLocation and driverName are correct before
establishing binlog connection
- // to avoid failing after binlog messages are received.
- // Actual JDBC connection is created after binlog client gets
started, because we need
- // the connect-able host same as the binlog client.
- registerDriver(driverLocation, driverName);
- } catch (InitializationException e) {
- throw new RuntimeException("Failed to register JDBC driver.
Ensure MySQL Driver Location(s)" +
- " and MySQL Driver Class Name are configured
correctly. " + e, e);
- }
+ try {
+ // Ensure driverLocation and driverName are correct before
establishing binlog connection
+ // to avoid failing after binlog messages are received.
+ // Actual JDBC connection is created after binlog client gets
started, because we need
+ // the connect-able host same as the binlog client.
+ registerDriver(driverLocation, driverName);
+ } catch (InitializationException e) {
+ throw new RuntimeException("Failed to register JDBC driver. Ensure
MySQL Driver Location(s)" +
+ " and MySQL Driver Class Name are configured correctly. "
+ e, e);
}
while (connectedHost == null && connectionAttempts < numHosts) {
@@ -871,12 +821,12 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
}
binlogClient.registerLifecycleListener(lifecycleListener);
- binlogClient.setBinlogFilename(currentBinlogFile);
- if (currentBinlogPosition != DO_NOT_SET) {
- binlogClient.setBinlogPosition(currentBinlogPosition);
+
binlogClient.setBinlogFilename(currentDataCaptureState.getBinlogFile());
+ if (currentDataCaptureState.getBinlogPosition() != DO_NOT_SET) {
+
binlogClient.setBinlogPosition(currentDataCaptureState.getBinlogPosition());
}
- binlogClient.setGtidSet(currentGtidSet);
+ binlogClient.setGtidSet(currentDataCaptureState.getGtidSet());
binlogClient.setGtidSetFallbackToPurged(true);
if (serverId != null) {
@@ -895,12 +845,12 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
connectTimeout = Long.MAX_VALUE;
}
binlogClient.connect(connectTimeout);
- transitUri = "mysql://" + connectedHost.getHostString() + ":"
+ connectedHost.getPort();
+ binlogResourceInfo.setTransitUri("mysql://" +
connectedHost.getHostString() + ":" + connectedHost.getPort());
} catch (IOException | TimeoutException te) {
// Try the next host
connectedHost = null;
- transitUri = "<unknown>";
+ binlogResourceInfo.setTransitUri("<unknown>");
currentHost = (currentHost + 1) % numHosts;
connectionAttempts++;
lastConnectException = te;
@@ -918,36 +868,37 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
throw new IOException("Could not connect binlog client to any of
the specified hosts due to: " + lastConnectException.getMessage(),
lastConnectException);
}
- if (createEnrichmentConnection) {
- final TlsConfiguration tlsConfiguration = sslContextService ==
null ? null : sslContextService.createTlsConfiguration();
- final ConnectionPropertiesProvider connectionPropertiesProvider =
new StandardConnectionPropertiesProvider(sslMode, tlsConfiguration);
- final Map<String, String> jdbcConnectionProperties =
connectionPropertiesProvider.getConnectionProperties();
- jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost,
username, password, jdbcConnectionProperties, connectTimeout);
- try {
- // Ensure connection can be created.
- getJdbcConnection();
- } catch (SQLException e) {
- getLogger().error("Error creating binlog enrichment JDBC
connection to any of the specified hosts", e);
- if (eventListener != null) {
- eventListener.stop();
- if (binlogClient != null) {
- binlogClient.unregisterEventListener(eventListener);
- }
- }
+ final TlsConfiguration tlsConfiguration = sslContextService == null ?
null : sslContextService.createTlsConfiguration();
+ final ConnectionPropertiesProvider connectionPropertiesProvider = new
StandardConnectionPropertiesProvider(sslMode, tlsConfiguration);
+ final Map<String, String> jdbcConnectionProperties =
connectionPropertiesProvider.getConnectionProperties();
+ jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost,
username, password, jdbcConnectionProperties, connectTimeout);
+ try {
+ // Ensure connection can be created.
+ getJdbcConnection();
+ } catch (SQLException e) {
+ getLogger().error("Error creating binlog enrichment JDBC
connection to any of the specified hosts", e);
+ if (eventListener != null) {
+ eventListener.stop();
if (binlogClient != null) {
- binlogClient.disconnect();
- binlogClient = null;
+ binlogClient.unregisterEventListener(eventListener);
}
- return;
}
+ if (binlogClient != null) {
+ binlogClient.disconnect();
+ binlogClient = null;
+ }
+ return;
}
gtidSet = new GtidSet(binlogClient.getGtidSet());
}
- public void outputEvents(ProcessSession session, ComponentLog log) throws
IOException {
+ public void outputEvents(ProcessSession session, ProcessContext context,
ComponentLog log) throws IOException {
RawBinlogEvent rawBinlogEvent;
+ DataCaptureState dataCaptureState = currentDataCaptureState.copy();
+ final boolean includeBeginCommit =
context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
+ final boolean includeDDLEvents =
context.getProperty(INCLUDE_DDL_EVENTS).asBoolean();
// Drain the queue
while (isScheduled() && (rawBinlogEvent = queue.poll()) != null) {
@@ -959,10 +910,10 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
// We always get ROTATE and FORMAT_DESCRIPTION messages no matter
where we start (even from the end), and they won't have the correct "next
position" value, so only
// advance the position if it is not that type of event. ROTATE
events don't generate output CDC events and have the current binlog position in
a special field, which
// is filled in during the ROTATE case
- if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION &&
!useGtid) {
- currentBinlogPosition = header.getPosition();
+ if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION &&
!currentDataCaptureState.isUseGtid()) {
+ dataCaptureState.setBinlogPosition(header.getPosition());
}
- log.debug("Message event, type={} pos={} file={}", eventType,
currentBinlogPosition, currentBinlogFile);
+ log.debug("Message event, type={} pos={} file={}", eventType,
dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
switch (eventType) {
case TABLE_MAP:
// This is sent to inform which table is about to be
changed by subsequent events
@@ -974,139 +925,92 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
if (!skipTable) {
TableInfoCacheKey key = new
TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(),
data.getTableId());
- if (cacheClient != null) {
+
binlogResourceInfo.setCurrentTable(tableInfoCache.get(key));
+ if (binlogResourceInfo.getCurrentTable() == null) {
+ // We don't have an entry for this table yet, so
fetch the info from the database and populate the cache
try {
- currentTable = cacheClient.get(key,
cacheKeySerializer, cacheValueDeserializer);
- } catch (ConnectException ce) {
- throw new IOException("Could not connect to
Distributed Map Cache server to get table information", ce);
- }
-
- if (currentTable == null) {
- // We don't have an entry for this table yet,
so fetch the info from the database and populate the cache
- try {
- currentTable = loadTableInfo(key);
- try {
- cacheClient.put(key, currentTable,
cacheKeySerializer, cacheValueSerializer);
- } catch (ConnectException ce) {
- throw new IOException("Could not
connect to Distributed Map Cache server to put table information", ce);
- }
- } catch (SQLException se) {
- // Propagate the error up, so things like
rollback and logging/bulletins can be handled
- throw new IOException(se.getMessage(), se);
- }
+
binlogResourceInfo.setCurrentTable(loadTableInfo(key));
+ tableInfoCache.put(key,
binlogResourceInfo.getCurrentTable());
+ } catch (SQLException se) {
+ // Propagate the error up, so things like
rollback and logging/bulletins can be handled
+ throw new IOException(se.getMessage(), se);
}
- } else {
- // Populate a limited version of TableInfo without
column information
- currentTable = new
TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(),
Collections.emptyList());
}
} else {
- // Clear the current table, to force a reload next
time we get a TABLE_MAP event we care about
- currentTable = null;
+ // Clear the current table, to force reload next time
we get a TABLE_MAP event we care about
+ binlogResourceInfo.setCurrentTable(null);
}
break;
case QUERY:
QueryEventData queryEventData = event.getData();
- currentDatabase = queryEventData.getDatabase();
+
binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase());
String sql = queryEventData.getSql();
// Is this the start of a transaction?
if ("BEGIN".equals(sql)) {
// If we're already in a transaction, something bad
happened, alert the user
- if (inTransaction) {
+ if (binlogResourceInfo.isInTransaction()) {
getLogger().debug("BEGIN event received at pos={}
file={} while already processing a transaction. This could indicate that your
binlog position is invalid "
- + "or the event stream is out of sync or
there was an issue with the processor state.", currentBinlogPosition,
currentBinlogFile);
+ + "or the event stream is out of sync or
there was an issue with the processor state.",
dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
}
- // Mark the current binlog position and GTID in case
we have to rollback the transaction (if the processor is stopped, e.g.)
- xactBinlogFile = currentBinlogFile;
- xactBinlogPosition = currentBinlogPosition;
- xactSequenceId = currentSequenceId.get();
- xactGtidSet = currentGtidSet;
-
- if (includeBeginCommit && (databaseNamePattern == null
|| databaseNamePattern.matcher(currentDatabase).matches())) {
- BeginTransactionEventInfo beginEvent = useGtid
- ? new
BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
- : new
BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile,
currentBinlogPosition);
- currentEventInfo = beginEvent;
- currentEventWriter = beginEventWriter;
-
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri,
beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
+ if (databaseNamePattern == null ||
databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())
{
+ beginEventHandler.handleEvent(queryEventData,
includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
+ binlogEventState, sql,
eventWriterConfiguration, currentSession, timestamp);
}
- inTransaction = true;
- //update inTransaction value to state
- updateState(session);
+ // Whether we skip this event or not, it's still the
beginning of a transaction
+ binlogResourceInfo.setInTransaction(true);
+
+ // Update inTransaction value to state
+ updateState(session, dataCaptureState);
} else if ("COMMIT".equals(sql)) {
- if (!inTransaction) {
+ // InnoDB generates XID events for "commit", but
MyISAM generates Query events with "COMMIT", so handle that here
+ if (!binlogResourceInfo.isInTransaction()) {
getLogger().debug("COMMIT event received at pos={}
file={} while not processing a transaction (i.e. no corresponding BEGIN event).
"
+ "This could indicate that your binlog
position is invalid or the event stream is out of sync or there was an issue
with the processor state "
- + "or there was an issue with the
processor state.", currentBinlogPosition, currentBinlogFile);
+ + "or there was an issue with the
processor state.", dataCaptureState.getBinlogPosition(),
dataCaptureState.getBinlogFile());
}
- // InnoDB generates XID events for "commit", but
MyISAM generates Query events with "COMMIT", so handle that here
- if (includeBeginCommit) {
- if (databaseNamePattern == null ||
databaseNamePattern.matcher(currentDatabase).matches()) {
- CommitTransactionEventInfo
commitTransactionEvent = useGtid
- ? new
CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
- : new
CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile,
currentBinlogPosition);
- currentEventInfo = commitTransactionEvent;
- currentEventWriter = commitEventWriter;
-
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri,
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS,
eventWriterConfiguration));
- }
- } else {
- // If the COMMIT event is not to be written, the
FlowFile should still be finished and the session committed.
- if (currentSession != null) {
- FlowFile flowFile =
eventWriterConfiguration.getCurrentFlowFile();
- if (flowFile != null && currentEventWriter !=
null) {
- // Flush the events to the FlowFile when
the processor is stopped
-
currentEventWriter.finishAndTransferFlowFile(currentSession,
eventWriterConfiguration, transitUri, currentSequenceId.get(),
currentEventInfo, REL_SUCCESS);
- }
- currentSession.commitAsync();
- }
+ if (databaseNamePattern == null ||
databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())
{
+ commitEventHandler.handleEvent(queryEventData,
includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
+ binlogEventState, sql,
eventWriterConfiguration, currentSession, timestamp);
}
-
- //update inTransaction value to state
- inTransaction = false;
- updateState(session);
+ // Whether we skip this event or not, it's the end of
a transaction
+ binlogResourceInfo.setInTransaction(false);
+ updateState(session, dataCaptureState);
// If there is no FlowFile open, commit the session
if (eventWriterConfiguration.getCurrentFlowFile() ==
null) {
// Commit the NiFi session
session.commitAsync();
}
- currentTable = null;
+ binlogResourceInfo.setCurrentTable(null);
+ binlogResourceInfo.setCurrentDatabase(null);
} else {
// Check for DDL events (alter table, e.g.). Normalize
the query to do string matching on the type of change
String normalizedQuery = normalizeQuery(sql);
- if (normalizedQuery.startsWith("alter table")
- || normalizedQuery.startsWith("alter ignore
table")
- || normalizedQuery.startsWith("create table")
- || normalizedQuery.startsWith("truncate table")
- || normalizedQuery.startsWith("rename table")
- || normalizedQuery.startsWith("drop table")
- || normalizedQuery.startsWith("drop
database")) {
-
- if (includeDDLEvents && (databaseNamePattern ==
null || databaseNamePattern.matcher(currentDatabase).matches())) {
- // If we don't have table information, we can
still use the database name
- TableInfo ddlTableInfo = (currentTable !=
null) ? currentTable : new TableInfo(currentDatabase, null, null, null);
- DDLEventInfo ddlEvent = useGtid
- ? new DDLEventInfo(ddlTableInfo,
timestamp, currentGtidSet, sql)
- : new DDLEventInfo(ddlTableInfo,
timestamp, currentBinlogFile, currentBinlogPosition, sql);
- currentEventInfo = ddlEvent;
- currentEventWriter = ddlEventWriter;
-
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri,
ddlEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
- }
- // Remove all the keys from the cache that this
processor added
- if (cacheClient != null) {
-
cacheClient.removeByPattern(this.getIdentifier() + ".*");
+ if (isQueryDDL(normalizedQuery)) {
+ if (databaseNamePattern == null ||
databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())
{
+ if (queryEventData.getDatabase() == null) {
+
queryEventData.setDatabase(binlogResourceInfo.getCurrentDatabase());
+ }
+ ddlEventHandler.handleEvent(queryEventData,
includeDDLEvents, currentDataCaptureState, binlogResourceInfo,
+ binlogEventState, sql,
eventWriterConfiguration, currentSession, timestamp);
+
+ // The altered table may not be the "active"
table, so clear the cache to pick up changes
+ tableInfoCache.clear();
}
+
// If not in a transaction, commit the session so
the DDL event(s) will be transferred
- if (includeDDLEvents && !inTransaction) {
- updateState(session);
+ if (includeDDLEvents &&
!binlogResourceInfo.isInTransaction()) {
+ updateState(session, dataCaptureState);
if
(FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()))
{
if (currentSession != null) {
FlowFile flowFile =
eventWriterConfiguration.getCurrentFlowFile();
- if (flowFile != null &&
currentEventWriter != null) {
+ if (flowFile != null &&
binlogEventState.getCurrentEventWriter() != null) {
// Flush the events to the
FlowFile when the processor is stopped
-
currentEventWriter.finishAndTransferFlowFile(currentSession,
eventWriterConfiguration, transitUri, currentSequenceId.get(),
currentEventInfo, REL_SUCCESS);
+
binlogEventState.getCurrentEventWriter().finishAndTransferFlowFile(currentSession,
eventWriterConfiguration, binlogResourceInfo.getTransitUri(),
+
dataCaptureState.getSequenceId(), binlogEventState.getCurrentEventInfo(),
REL_SUCCESS);
}
}
}
@@ -1120,41 +1024,26 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
break;
case XID:
- if (!inTransaction) {
+ if (!binlogResourceInfo.isInTransaction()) {
getLogger().debug("COMMIT (XID) event received at
pos={} file={} /while not processing a transaction (i.e. no corresponding BEGIN
event). "
- + "This could indicate that your binlog
position is invalid or the event stream is out of sync or there was an issue
with the processor state.",
- currentBinlogPosition, currentBinlogFile);
+ + "This could indicate that your
binlog position is invalid or the event stream is out of sync or there was an
issue with the processor state.",
+ dataCaptureState.getBinlogPosition(),
dataCaptureState.getBinlogFile());
}
- if (includeBeginCommit) {
- if (databaseNamePattern == null ||
databaseNamePattern.matcher(currentDatabase).matches()) {
- CommitTransactionEventInfo commitTransactionEvent
= useGtid
- ? new
CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
- : new
CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile,
currentBinlogPosition);
- currentEventInfo = commitTransactionEvent;
- currentEventWriter = commitEventWriter;
-
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri,
commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS,
eventWriterConfiguration));
- }
- } else {
- // If the COMMIT event is not to be written, the
FlowFile should still be finished and the session committed.
- if (currentSession != null) {
- FlowFile flowFile =
eventWriterConfiguration.getCurrentFlowFile();
- if (flowFile != null && currentEventWriter !=
null) {
- // Flush the events to the FlowFile when the
processor is stopped
-
currentEventWriter.finishAndTransferFlowFile(currentSession,
eventWriterConfiguration, transitUri, currentSequenceId.get(),
currentEventInfo, REL_SUCCESS);
- }
- }
+ if (databaseNamePattern == null ||
databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches())
{
+ commitEventHandler.handleEvent(event.getData(),
includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
+ binlogEventState, null,
eventWriterConfiguration, currentSession, timestamp);
}
- // update inTransaction value and save next position
- // so when restart this processor,we will not read xid
event again
- inTransaction = false;
- currentBinlogPosition = header.getNextPosition();
- updateState(session);
+ // Whether we skip this event or not, it's the end of a
transaction
+ binlogResourceInfo.setInTransaction(false);
+
dataCaptureState.setBinlogPosition(header.getNextPosition());
+ updateState(session, dataCaptureState);
// If there is no FlowFile open, commit the session
if (eventWriterConfiguration.getCurrentFlowFile() == null)
{
+ // Commit the NiFi session
session.commitAsync();
}
- currentTable = null;
- currentDatabase = null;
+ binlogResourceInfo.setCurrentTable(null);
+ binlogResourceInfo.setCurrentDatabase(null);
break;
case WRITE_ROWS:
@@ -1170,11 +1059,11 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
if (skipTable) {
break;
}
- if (!inTransaction) {
+ if (!binlogResourceInfo.isInTransaction()) {
// These events should only happen inside a
transaction, warn the user otherwise
log.info("Event {} occurred outside of a transaction,
which is unexpected.", eventType.name());
}
- if (currentTable == null && cacheClient != null) {
+ if (binlogResourceInfo.getCurrentTable() == null) {
// No Table Map event was processed prior to this
event, which should not happen, so throw an error
throw new RowEventException("No table information is
available for this event, cannot process further.");
}
@@ -1183,52 +1072,42 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
|| eventType == EXT_WRITE_ROWS
|| eventType == PRE_GA_WRITE_ROWS) {
- InsertRowsEventInfo eventInfo = useGtid
- ? new InsertRowsEventInfo(currentTable,
timestamp, currentGtidSet, event.getData())
- : new InsertRowsEventInfo(currentTable,
timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
- currentEventInfo = eventInfo;
- currentEventWriter = insertRowsWriter;
-
currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri,
eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
+ insertEventHandler.handleEvent(event.getData(), true,
currentDataCaptureState, binlogResourceInfo,
+ binlogEventState, null,
eventWriterConfiguration, currentSession, timestamp);
} else if (eventType == DELETE_ROWS
|| eventType == EXT_DELETE_ROWS
|| eventType == PRE_GA_DELETE_ROWS) {
- DeleteRowsEventInfo eventInfo = useGtid
- ? new DeleteRowsEventInfo(currentTable,
timestamp, currentGtidSet, event.getData())
- : new DeleteRowsEventInfo(currentTable,
timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
- currentEventInfo = eventInfo;
- currentEventWriter = deleteRowsWriter;
-
currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri,
eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
-
+ deleteEventHandler.handleEvent(event.getData(), true,
currentDataCaptureState, binlogResourceInfo,
+ binlogEventState, null,
eventWriterConfiguration, currentSession, timestamp);
} else {
// Update event
- UpdateRowsEventInfo eventInfo = useGtid
- ? new UpdateRowsEventInfo(currentTable,
timestamp, currentGtidSet, event.getData())
- : new UpdateRowsEventInfo(currentTable,
timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
- currentEventInfo = eventInfo;
- currentEventWriter = updateRowsWriter;
-
currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri,
eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
+ updateEventHandler.handleEvent(event.getData(), true,
currentDataCaptureState, binlogResourceInfo,
+ binlogEventState, null,
eventWriterConfiguration, currentSession, timestamp);
}
break;
case ROTATE:
- if (!useGtid) {
+ if (!currentDataCaptureState.isUseGtid()) {
// Update current binlog filename
RotateEventData rotateEventData = event.getData();
- currentBinlogFile =
rotateEventData.getBinlogFilename();
- currentBinlogPosition =
rotateEventData.getBinlogPosition();
+
dataCaptureState.setBinlogFile(rotateEventData.getBinlogFilename());
+
dataCaptureState.setBinlogPosition(rotateEventData.getBinlogPosition());
}
- updateState(session);
+ updateState(session, dataCaptureState);
break;
case GTID:
- if (useGtid) {
+ if (currentDataCaptureState.isUseGtid()) {
// Update current binlog gtid
GtidEventData gtidEventData = event.getData();
- gtidSet.add(gtidEventData.getGtid());
- currentGtidSet = gtidSet.toString();
- updateState(session);
+ MySqlGtid mySqlGtid = gtidEventData.getMySqlGtid();
+ if (mySqlGtid != null) {
+ gtidSet.add(mySqlGtid.toString());
+ dataCaptureState.setGtidSet(gtidSet.toString());
+ updateState(session, dataCaptureState);
+ }
}
break;
@@ -1239,12 +1118,22 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
// Advance the current binlog position. This way if no more events
are received and the processor is stopped, it will resume after the event that
was just processed.
// We always get ROTATE and FORMAT_DESCRIPTION messages no matter
where we start (even from the end), and they won't have the correct "next
position" value, so only
// advance the position if it is not that type of event.
- if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION &&
!useGtid && eventType != XID) {
- currentBinlogPosition = header.getNextPosition();
+ if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION &&
!currentDataCaptureState.isUseGtid() && eventType != XID) {
+ dataCaptureState.setBinlogPosition(header.getNextPosition());
}
}
}
+ private boolean isQueryDDL(String sql) {
+ return sql.startsWith("alter table")
+ || sql.startsWith("alter ignore table")
+ || sql.startsWith("create table")
+ || sql.startsWith("truncate table")
+ || sql.startsWith("rename table")
+ || sql.startsWith("drop table")
+ || sql.startsWith("drop database");
+ }
+
protected void clearState() throws IOException {
if (currentSession == null) {
throw new IllegalStateException("No current session");
@@ -1263,7 +1152,8 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
return normalizedQuery;
}
- protected void stop() throws CDCException {
+ @OnStopped
+ public void stop() throws CDCException {
try {
if (eventListener != null) {
eventListener.stop();
@@ -1277,14 +1167,18 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
if (currentSession != null) {
FlowFile flowFile =
eventWriterConfiguration.getCurrentFlowFile();
- if (flowFile != null && currentEventWriter != null) {
+ if (flowFile != null &&
binlogEventState.getCurrentEventWriter() != null) {
// Flush the events to the FlowFile when the processor is
stopped
-
currentEventWriter.finishAndTransferFlowFile(currentSession,
eventWriterConfiguration, transitUri, currentSequenceId.get(),
currentEventInfo, REL_SUCCESS);
+
binlogEventState.getCurrentEventWriter().finishAndTransferFlowFile(
+ currentSession,
+ eventWriterConfiguration,
+ binlogResourceInfo.getTransitUri(),
+ currentDataCaptureState.getSequenceId(),
binlogEventState.getCurrentEventInfo(), REL_SUCCESS);
}
currentSession.commitAsync();
}
- currentBinlogPosition = -1;
+ currentDataCaptureState.setBinlogPosition(-1);
} catch (IOException e) {
throw new CDCException("Error closing CDC connection", e);
} finally {
@@ -1296,29 +1190,30 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
}
}
- private void updateState(ProcessSession session) throws IOException {
- updateState(session, currentBinlogFile, currentBinlogPosition,
currentSequenceId.get(), currentGtidSet, inTransaction);
+ private void updateState(ProcessSession session, DataCaptureState
dataCaptureState) throws IOException {
+ updateState(session, dataCaptureState,
binlogResourceInfo.isInTransaction());
}
- private void updateState(ProcessSession session, String binlogFile, long
binlogPosition, long sequenceId, String gtidSet, boolean inTransaction) throws
IOException {
+ private void updateState(ProcessSession session, DataCaptureState
dataCaptureState, boolean inTransaction) throws IOException {
// Update state with latest values
final Map<String, String> newStateMap = new
HashMap<>(session.getState(Scope.CLUSTER).toMap());
// Save current binlog filename, position and GTID to the state map
- if (binlogFile != null) {
- newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, binlogFile);
+ if (dataCaptureState.getBinlogFile() != null) {
+ newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY,
dataCaptureState.getBinlogFile());
}
- newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY,
Long.toString(binlogPosition));
- newStateMap.put(SEQUENCE_ID_KEY, String.valueOf(sequenceId));
+ newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY,
Long.toString(dataCaptureState.getBinlogPosition()));
+ newStateMap.put(SEQUENCE_ID_KEY,
String.valueOf(dataCaptureState.getSequenceId()));
//add inTransaction value into state
newStateMap.put("inTransaction", inTransaction ? "true" : "false");
- if (gtidSet != null) {
- newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
+ if (dataCaptureState.getGtidSet() != null) {
+ newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY,
dataCaptureState.getGtidSet());
}
session.setState(newStateMap, Scope.CLUSTER);
+ currentDataCaptureState = dataCaptureState;
}
@@ -1391,8 +1286,13 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
private JDBCConnectionHolder(InetSocketAddress host, String username,
String password, Map<String, String> customProperties, long
connectionTimeoutMillis) {
this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":"
+ host.getPort();
connectionProps.putAll(customProperties);
- connectionProps.put("user", username);
- connectionProps.put("password", password);
+ if (username != null) {
+ connectionProps.put("user", username);
+ if (password != null) {
+ connectionProps.put("password", password);
+ }
+ }
+
this.connectionTimeoutMillis = connectionTimeoutMillis;
}
@@ -1486,7 +1386,71 @@ public class CaptureChangeMySQL extends
AbstractSessionFactoryProcessor {
public Logger getParentLogger() throws SQLFeatureNotSupportedException
{
return driver.getParentLogger();
}
+ }
+
+ public static class BinlogEventState {
+ private BinlogEventInfo currentEventInfo;
+ private AbstractBinlogEventWriter<? extends BinlogEventInfo>
currentEventWriter;
+
+ public BinlogEventInfo getCurrentEventInfo() {
+ return currentEventInfo;
+ }
+ public void setCurrentEventInfo(BinlogEventInfo currentEventInfo) {
+ this.currentEventInfo = currentEventInfo;
+ }
+
+ public AbstractBinlogEventWriter<? extends BinlogEventInfo>
getCurrentEventWriter() {
+ return currentEventWriter;
+ }
+
+ public void setCurrentEventWriter(AbstractBinlogEventWriter<? extends
BinlogEventInfo> currentEventWriter) {
+ this.currentEventWriter = currentEventWriter;
+ }
}
+ public static class BinlogResourceInfo {
+ private TableInfo currentTable = null;
+ private String currentDatabase = null;
+
+ private boolean inTransaction = false;
+
+ private String transitUri = "<unknown>";
+
+ public BinlogResourceInfo() {
+
+ }
+
+ public TableInfo getCurrentTable() {
+ return currentTable;
+ }
+
+ public void setCurrentTable(TableInfo currentTable) {
+ this.currentTable = currentTable;
+ }
+
+ public String getCurrentDatabase() {
+ return currentDatabase;
+ }
+
+ public void setCurrentDatabase(String currentDatabase) {
+ this.currentDatabase = currentDatabase;
+ }
+
+ public boolean isInTransaction() {
+ return inTransaction;
+ }
+
+ public void setInTransaction(boolean inTransaction) {
+ this.inTransaction = inTransaction;
+ }
+
+ public String getTransitUri() {
+ return transitUri;
+ }
+
+ public void setTransitUri(String transitUri) {
+ this.transitUri = transitUri;
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
index 83c3a66f69..20523cbc06 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy
@@ -23,6 +23,7 @@ import com.github.shyiko.mysql.binlog.event.EventData
import com.github.shyiko.mysql.binlog.event.EventHeaderV4
import com.github.shyiko.mysql.binlog.event.EventType
import com.github.shyiko.mysql.binlog.event.GtidEventData
+import com.github.shyiko.mysql.binlog.event.MySqlGtid
import com.github.shyiko.mysql.binlog.event.QueryEventData
import com.github.shyiko.mysql.binlog.event.RotateEventData
import com.github.shyiko.mysql.binlog.event.TableMapEventData
@@ -30,7 +31,6 @@ import
com.github.shyiko.mysql.binlog.event.UpdateRowsEventData
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData
import com.github.shyiko.mysql.binlog.network.SSLMode
import groovy.json.JsonSlurper
-import org.apache.commons.io.output.WriterOutputStream
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading
import org.apache.nifi.cdc.event.ColumnDefinition
import org.apache.nifi.cdc.event.TableInfo
@@ -40,22 +40,12 @@ import
org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy
import org.apache.nifi.cdc.mysql.MockBinlogClient
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory
-import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.components.state.Scope
-import org.apache.nifi.controller.AbstractControllerService
-import org.apache.nifi.distributed.cache.client.Deserializer
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
-import
org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
-import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.flowfile.attributes.CoreAttributes
-import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.ssl.SSLContextService
-import org.apache.nifi.state.MockStateManager
-import org.apache.nifi.util.MockComponentLog
-import org.apache.nifi.util.MockControllerServiceInitializationContext
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.BeforeEach
@@ -67,8 +57,6 @@ import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement
import java.util.concurrent.TimeoutException
-import java.util.regex.Matcher
-import java.util.regex.Pattern
import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertNotNull
@@ -378,13 +366,6 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
- final DistributedMapCacheClientImpl cacheClient = createCacheClient()
- def clientProperties = [:]
-
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(),
'localhost')
- testRunner.addControllerService('client', cacheClient,
clientProperties)
- testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
- testRunner.enableControllerService(cacheClient)
-
testRunner.run(1, false, true)
@@ -521,7 +502,7 @@ class CaptureChangeMySQLTest {
@Test
void testExcludeSchemaChanges() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION,
DRIVER_LOCATION)
- testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost') // Don't
include port here, should default to 3306
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, '1')
@@ -530,13 +511,6 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'false')
- final DistributedMapCacheClientImpl cacheClient = createCacheClient()
- def clientProperties = [:]
-
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(),
'localhost')
- testRunner.addControllerService('client', cacheClient,
clientProperties)
- testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
- testRunner.enableControllerService(cacheClient)
-
testRunner.run(1, false, true)
@@ -595,16 +569,10 @@ class CaptureChangeMySQLTest {
@Test
void testNoTableInformationAvailable() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION,
DRIVER_LOCATION)
- testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
+ testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost') // Port
should default to 3306
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
- final DistributedMapCacheClientImpl cacheClient = createCacheClient()
- def clientProperties = [:]
-
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(),
'localhost')
- testRunner.addControllerService('client', cacheClient,
clientProperties)
- testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
- testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
@@ -1106,12 +1074,7 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
- final DistributedMapCacheClientImpl cacheClient = createCacheClient()
- def clientProperties = [:]
-
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(),
'localhost')
- testRunner.addControllerService('client', cacheClient,
clientProperties)
- testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
- testRunner.enableControllerService(cacheClient)
+
testRunner.run(1, false, true)
// ROTATE
@@ -1174,19 +1137,13 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
- final DistributedMapCacheClientImpl cacheClient = createCacheClient()
- def clientProperties = [:]
-
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(),
'localhost')
- testRunner.addControllerService('client', cacheClient,
clientProperties)
- testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
- testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
// GTID
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.GTID,
nextPosition: 2] as EventHeaderV4,
- [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1'] as
GtidEventData
+ [gtid: MySqlGtid.fromString(
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1')] as GtidEventData
))
// BEGIN
@@ -1206,7 +1163,7 @@ class CaptureChangeMySQLTest {
// Stop the processor and verify the state is set
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY,
'', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY,
'6', Scope.CLUSTER)
-
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY,
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1', Scope.CLUSTER)
+
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY,
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-1', Scope.CLUSTER)
((CaptureChangeMySQL) testRunner.getProcessor()).clearState()
testRunner.stateManager.clear(Scope.CLUSTER)
@@ -1218,7 +1175,7 @@ class CaptureChangeMySQLTest {
// GTID
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.GTID,
nextPosition: 8] as EventHeaderV4,
- [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2'] as
GtidEventData
+ [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2'] as
GtidEventData
))
// BEGIN
@@ -1239,12 +1196,12 @@ class CaptureChangeMySQLTest {
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY,
'', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY,
'12', Scope.CLUSTER)
-
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY,
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-2', Scope.CLUSTER)
+
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY,
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-2', Scope.CLUSTER)
// GTID
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.GTID,
nextPosition: 14] as EventHeaderV4,
- [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as
GtidEventData
+ [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as
GtidEventData
))
// BEGIN
@@ -1263,7 +1220,7 @@ class CaptureChangeMySQLTest {
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY,
'', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY,
'18', Scope.CLUSTER)
-
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY,
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3', Scope.CLUSTER)
+
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY,
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-3', Scope.CLUSTER)
}
@Test
@@ -1340,12 +1297,12 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
- testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID,
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1')
+ testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID,
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1')
testRunner.setProperty(CaptureChangeMySQL.INIT_SEQUENCE_ID, '10')
testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS,
'false')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
testRunner.getStateManager().setState([
- ("${BinlogEventInfo.BINLOG_GTIDSET_KEY}".toString()):
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2',
+ ("${BinlogEventInfo.BINLOG_GTIDSET_KEY}".toString()):
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2',
("${EventWriter.SEQUENCE_ID_KEY}".toString()): '1'
], Scope.CLUSTER)
@@ -1354,7 +1311,7 @@ class CaptureChangeMySQLTest {
// GTID
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.GTID,
nextPosition: 2] as EventHeaderV4,
- [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as
GtidEventData
+ [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as
GtidEventData
))
// BEGIN
@@ -1375,7 +1332,7 @@ class CaptureChangeMySQLTest {
assertEquals(2, resultFiles.size())
assertEquals(
- 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3',
+ 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-3',
resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY)
)
}
@@ -1388,7 +1345,7 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
- testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID,
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1')
+ testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID,
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1')
testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS,
'false')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
@@ -1397,7 +1354,7 @@ class CaptureChangeMySQLTest {
// GTID
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.GTID,
nextPosition: 2] as EventHeaderV4,
- [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as
GtidEventData
+ [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as
GtidEventData
))
// BEGIN
@@ -1418,7 +1375,7 @@ class CaptureChangeMySQLTest {
assertEquals(2, resultFiles.size())
assertEquals(
- 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1:3-3',
+ 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-1:3-3',
resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY)
)
}
@@ -1430,12 +1387,6 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root")
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds")
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true")
- final DistributedMapCacheClientImpl cacheClient = createCacheClient()
- Map<String, String> clientProperties = new HashMap<>()
-
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(),
"localhost")
- testRunner.addControllerService("client", cacheClient,
clientProperties)
- testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client")
- testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
// COMMIT
@@ -1496,131 +1447,5 @@ class CaptureChangeMySQLTest {
when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet)
return mockConnection
}
-
- }
-
- static DistributedMapCacheClientImpl createCacheClient() throws
InitializationException {
-
- final DistributedMapCacheClientImpl client = new
DistributedMapCacheClientImpl()
- final ComponentLog logger = new MockComponentLog("client", client)
- final MockControllerServiceInitializationContext clientInitContext =
new MockControllerServiceInitializationContext(client, "client", logger, new
MockStateManager(client))
-
- client.initialize(clientInitContext)
-
- return client
- }
-
- static
- final class DistributedMapCacheClientImpl extends
AbstractControllerService implements DistributedMapCacheClient {
-
- private Map<String, String> cacheMap = new HashMap<>()
-
- @Override
- void close() throws IOException {
- }
-
- @Override
- void onPropertyModified(final PropertyDescriptor descriptor, final
String oldValue, final String newValue) {
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-
- return [DistributedMapCacheClientService.HOSTNAME,
- DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT,
- DistributedMapCacheClientService.PORT,
- DistributedMapCacheClientService.SSL_CONTEXT_SERVICE]
- }
-
- @Override
- <K, V> boolean putIfAbsent(
- final K key,
- final V value,
- final Serializer<K> keySerializer, final Serializer<V>
valueSerializer) throws IOException {
-
- StringWriter keyWriter = new StringWriter()
- keySerializer.serialize(key, new WriterOutputStream(keyWriter))
- String keyString = keyWriter.toString()
-
- if (cacheMap.containsKey(keyString)) return false
-
- StringWriter valueWriter = new StringWriter()
- valueSerializer.serialize(value, new
WriterOutputStream(valueWriter))
- return true
- }
-
- @Override
- @SuppressWarnings("unchecked")
- <K, V> V getAndPutIfAbsent(
- final K key, final V value, final Serializer<K> keySerializer,
final Serializer<V> valueSerializer,
- final Deserializer<V> valueDeserializer) throws IOException {
- StringWriter keyWriter = new StringWriter()
- keySerializer.serialize(key, new WriterOutputStream(keyWriter))
- String keyString = keyWriter.toString()
-
- if (cacheMap.containsKey(keyString)) return
valueDeserializer.deserialize(cacheMap.get(keyString).bytes)
-
- StringWriter valueWriter = new StringWriter()
- valueSerializer.serialize(value, new
WriterOutputStream(valueWriter))
- return null
- }
-
- @Override
- <K> boolean containsKey(final K key, final Serializer<K>
keySerializer) throws IOException {
- StringWriter keyWriter = new StringWriter()
- keySerializer.serialize(key, new WriterOutputStream(keyWriter))
- String keyString = keyWriter.toString()
-
- return cacheMap.containsKey(keyString)
- }
-
- @Override
- <K, V> V get(
- final K key,
- final Serializer<K> keySerializer, final Deserializer<V>
valueDeserializer) throws IOException {
- StringWriter keyWriter = new StringWriter()
- keySerializer.serialize(key, new WriterOutputStream(keyWriter))
- String keyString = keyWriter.toString()
-
- return (cacheMap.containsKey(keyString)) ?
valueDeserializer.deserialize(cacheMap.get(keyString).bytes) : null
- }
-
- @Override
- <K> boolean remove(final K key, final Serializer<K> serializer) throws
IOException {
- StringWriter keyWriter = new StringWriter()
- serializer.serialize(key, new WriterOutputStream(keyWriter))
- String keyString = keyWriter.toString()
-
- boolean removed = (cacheMap.containsKey(keyString))
- cacheMap.remove(keyString)
- return removed
- }
-
- @Override
- long removeByPattern(String regex) throws IOException {
- final List<String> removedRecords = new ArrayList<>()
- Pattern p = Pattern.compile(regex)
- for (String key : cacheMap.keySet()) {
- // Key must be backed by something that can be converted into
a String
- Matcher m = p.matcher(key)
- if (m.matches()) {
- removedRecords.add(cacheMap.get(key))
- }
- }
- final long numRemoved = removedRecords.size()
- removedRecords.each {cacheMap.remove(it)}
- return numRemoved
- }
-
- @Override
- <K, V> void put(
- final K key,
- final V value,
- final Serializer<K> keySerializer, final Serializer<V>
valueSerializer) throws IOException {
- StringWriter keyWriter = new StringWriter()
- keySerializer.serialize(key, new WriterOutputStream(keyWriter))
- StringWriter valueWriter = new StringWriter()
- valueSerializer.serialize(value, new
WriterOutputStream(valueWriter))
- }
}
}
diff --git
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/io/TestInsertRowsWriter.java
similarity index 65%
rename from
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java
rename to
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/io/TestInsertRowsWriter.java
index 73573a2bd5..a34565ddf6 100644
---
a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java
+++
b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/io/TestInsertRowsWriter.java
@@ -14,7 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nifi.cdc.mysql.event;
+package org.apache.nifi.cdc.mysql.event.io;
+
import org.junit.jupiter.api.Test;
@@ -23,17 +24,15 @@ import java.sql.Types;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+class TestInsertRowsWriter {
-/**
- * Unit Tests for MySQLCDCUtils utility class
- */
-public class MySQLCDCUtilsTest {
@Test
- public void testGetWritableObject() throws Exception {
- assertNull(MySQLCDCUtils.getWritableObject(null, null));
- assertNull(MySQLCDCUtils.getWritableObject(Types.INTEGER, null));
- assertEquals((byte) 1, MySQLCDCUtils.getWritableObject(Types.INTEGER,
(byte) 1));
- assertEquals("Hello", MySQLCDCUtils.getWritableObject(Types.VARCHAR,
"Hello".getBytes()));
+ public void testGetWritableObject() {
+ InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
+ assertNull(insertRowsWriter.getWritableObject(null, null));
+ assertNull(insertRowsWriter.getWritableObject(Types.INTEGER, null));
+ assertEquals((byte) 1,
insertRowsWriter.getWritableObject(Types.INTEGER, (byte) 1));
+ assertEquals("Hello",
insertRowsWriter.getWritableObject(Types.VARCHAR, "Hello".getBytes()));
}
}
\ No newline at end of file