NIFI-3413: Add GetChangeDataCaptureMySQL processor NIFI-3413: Incorporated review comments NIFI-3413: Changed GetChangeDataCaptureMySQL to CaptureChangeMySQL, fixed some bugs NIFI-3413: Refactored setup() for better error handling, more review comments incorporated NIFI-3413: Refactored CDC into its own module(s), updated assembly and top-level POMs NIFI-3413: Added RECEIVE prov event and Server ID property
Signed-off-by: ijokarumawak <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8f37ad45 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8f37ad45 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8f37ad45 Branch: refs/heads/master Commit: 8f37ad4512fdca1627938b9ae4fd40b02ab2999d Parents: 3386839 Author: Matt Burgess <[email protected]> Authored: Thu Mar 23 18:43:04 2017 -0400 Committer: ijokarumawak <[email protected]> Committed: Fri Apr 7 00:44:42 2017 +0900 ---------------------------------------------------------------------- nifi-assembly/pom.xml | 5 + nifi-nar-bundles/nifi-cdc/nifi-cdc-api/pom.xml | 43 + .../java/org/apache/nifi/cdc/CDCException.java | 42 + .../apache/nifi/cdc/event/BaseEventInfo.java | 42 + .../apache/nifi/cdc/event/BaseRowEventInfo.java | 37 + .../nifi/cdc/event/BaseTableEventInfo.java | 65 ++ .../apache/nifi/cdc/event/ColumnDefinition.java | 73 ++ .../org/apache/nifi/cdc/event/EventInfo.java | 36 + .../nifi/cdc/event/RowEventException.java | 40 + .../org/apache/nifi/cdc/event/RowEventInfo.java | 27 + .../apache/nifi/cdc/event/TableEventInfo.java | 31 + .../org/apache/nifi/cdc/event/TableInfo.java | 155 +++ .../nifi/cdc/event/TableInfoCacheKey.java | 98 ++ .../nifi/cdc/event/io/AbstractEventWriter.java | 64 ++ .../apache/nifi/cdc/event/io/EventWriter.java | 43 + .../nifi-cdc-mysql-nar/pom.xml | 40 + .../src/main/resources/META-INF/NOTICE | 5 + .../nifi-cdc-mysql-processors/pom.xml | 67 ++ .../cdc/mysql/event/BaseBinlogEventInfo.java | 42 + .../cdc/mysql/event/BaseBinlogRowEventInfo.java | 48 + .../mysql/event/BaseBinlogTableEventInfo.java | 62 ++ .../mysql/event/BeginTransactionEventInfo.java | 27 + .../nifi/cdc/mysql/event/BinlogEventInfo.java | 32 + .../cdc/mysql/event/BinlogEventListener.java | 65 ++ .../mysql/event/BinlogLifecycleListener.java | 61 ++ .../cdc/mysql/event/BinlogTableEventInfo.java | 25 + .../mysql/event/CommitTransactionEventInfo.java | 28 + .../cdc/mysql/event/DeleteRowsEventInfo.java | 32 + .../cdc/mysql/event/InsertRowsEventInfo.java | 35 + .../nifi/cdc/mysql/event/MySQLCDCUtils.java | 47 + .../nifi/cdc/mysql/event/RawBinlogEvent.java | 45 + .../cdc/mysql/event/SchemaChangeEventInfo.java | 38 + .../cdc/mysql/event/UpdateRowsEventInfo.java | 41 + .../event/io/AbstractBinlogEventWriter.java | 67 ++ .../io/AbstractBinlogTableEventWriter.java | 65 ++ .../event/io/BeginTransactionEventWriter.java | 25 + .../event/io/CommitTransactionEventWriter.java | 27 + .../cdc/mysql/event/io/DeleteRowsWriter.java | 92 ++ .../cdc/mysql/event/io/InsertRowsWriter.java | 92 ++ .../mysql/event/io/SchemaChangeEventWriter.java | 43 + .../cdc/mysql/event/io/UpdateRowsWriter.java | 103 ++ .../mysql/processors/CaptureChangeMySQL.java | 1033 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 15 + .../nifi/cdc/mysql/MockBinlogClient.groovy | 97 ++ .../processors/CaptureChangeMySQLTest.groovy | 795 ++++++++++++++ .../nifi/cdc/mysql/event/MySQLCDCUtilsTest.java | 39 + .../nifi-cdc/nifi-cdc-mysql-bundle/pom.xml | 41 + nifi-nar-bundles/nifi-cdc/pom.xml | 29 + nifi-nar-bundles/pom.xml | 1 + pom.xml | 6 + 50 files changed, 4111 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 7751666..fd5ce78 100755 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -458,6 +458,11 @@ language governing permissions and limitations under the License. --> <artifactId>nifi-stateful-analysis-nar</artifactId> <type>nar</type> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cdc-mysql-nar</artifactId> + <type>nar</type> + </dependency> </dependencies> <profiles> <profile> http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/pom.xml new file mode 100644 index 0000000..bf24dbb --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/pom.xml @@ -0,0 +1,43 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cdc</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + + <artifactId>nifi-cdc-api</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/CDCException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/CDCException.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/CDCException.java new file mode 100644 index 0000000..7356e4c --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/CDCException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc; + +/** + * An exception class representing errors that occur during Change Data Capture (CDC) processing + */ +public class CDCException extends Exception { + + public CDCException() { + } + + public CDCException(String message) { + super(message); + } + + public CDCException(String message, Throwable cause) { + super(message, cause); + } + + public CDCException(Throwable cause) { + super(cause); + } + + public CDCException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseEventInfo.java new file mode 100644 index 0000000..13307bb --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseEventInfo.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event; + + +/** + * A base class for all MySQL binlog events + */ +public class BaseEventInfo implements EventInfo { + + private final String eventType; + private final Long timestamp; + + public BaseEventInfo(String eventType, Long timestamp) { + this.eventType = eventType; + this.timestamp = timestamp; + } + + + public String getEventType() { + return eventType; + } + + public Long getTimestamp() { + return timestamp; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseRowEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseRowEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseRowEventInfo.java new file mode 100644 index 0000000..1107a91 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseRowEventInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event; + + +import java.util.List; + +/** + * A base class to use for representing information to row mutation events + */ +public class BaseRowEventInfo<RowEventDataType> extends BaseTableEventInfo implements RowEventInfo<RowEventDataType> { + + protected List<RowEventDataType> rows; + + public BaseRowEventInfo(TableInfo tableInfo, String eventType, Long timestamp, List<RowEventDataType> rows) { + super(tableInfo, eventType, timestamp); + this.rows = rows; + } + + public List<RowEventDataType> getRows() { + return rows; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseTableEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseTableEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseTableEventInfo.java new file mode 100644 index 0000000..bc34608 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/BaseTableEventInfo.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event; + +import java.util.List; + +/** + * An abstract base class for all MySQL binlog events affecting a table. + */ +public class BaseTableEventInfo extends BaseEventInfo implements TableEventInfo { + + private String databaseName; + private String tableName; + private Long tableId; + + private List<ColumnDefinition> columns; + + public BaseTableEventInfo(TableInfo tableInfo, String eventType, Long timestamp) { + super(eventType, timestamp); + if (tableInfo != null) { + this.databaseName = tableInfo.getDatabaseName(); + this.tableName = tableInfo.getTableName(); + this.tableId = tableInfo.getTableId(); + this.columns = tableInfo.getColumns(); + } + } + + public String getDatabaseName() { + return databaseName; + } + + public String getTableName() { + return tableName; + } + + public Long getTableId() { + return tableId; + } + + public List<ColumnDefinition> getColumns() { + return columns; + } + + public ColumnDefinition getColumnByIndex(int i) { + try { + return columns.get(i); + } catch (IndexOutOfBoundsException | NullPointerException e) { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java new file mode 100644 index 0000000..c3f14ec --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event; + +import org.apache.commons.lang3.builder.EqualsBuilder; + +/** + * A class that specifies a definition for a relational table column, including type, name, etc. + */ +public class ColumnDefinition { + + private int type; + private String name = ""; + + public ColumnDefinition(int type) { + this.type = type; + } + + public ColumnDefinition(int type, String name) { + this(type); + this.name = name; + } + + public int getType() { + return type; + } + + public void setType(int type) { + this.type = type; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ColumnDefinition that = (ColumnDefinition) o; + + return new EqualsBuilder() + .append(type, that.type) + .append(name, that.name) + .isEquals(); + } + + @Override + public int hashCode() { + int result = type; + result = 31 * result + (name != null ? name.hashCode() : 0); + return result; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java new file mode 100644 index 0000000..7608cc2 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event; + + +/** + * An interface representing a data structure containing event information, and serialization/deserlization methods. + */ +public interface EventInfo { + + // Event type constants + String BEGIN_EVENT = "begin"; + String COMMIT_EVENT = "commit"; + String WRITE_EVENT = "write"; + String DELETE_EVENT = "delete"; + String UPDATE_EVENT = "update"; + String SCHEMA_CHANGE = "schema_change"; + + String getEventType(); + + Long getTimestamp(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventException.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventException.java new file mode 100644 index 0000000..94f2ef5 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event; + +import java.io.IOException; + +/** + * A marker class for Exceptions that occur during the handling of CDC row modification events + */ +public class RowEventException extends IOException { + + public RowEventException() { + } + + public RowEventException(String message) { + super(message); + } + + public RowEventException(String message, Throwable cause) { + super(message, cause); + } + + public RowEventException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventInfo.java new file mode 100644 index 0000000..b02b25b --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/RowEventInfo.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event; + +import java.util.List; + +/** + * An interface corresponding to row-level info from events on tabular data structures. + */ +public interface RowEventInfo<RowEventDataType> extends TableEventInfo { + + List<RowEventDataType> getRows(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableEventInfo.java new file mode 100644 index 0000000..7754225 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableEventInfo.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event; + +import java.util.List; + +/** + * An interface for methods related to events that occur on tabular data structures. + */ +public interface TableEventInfo extends EventInfo { + + String getDatabaseName(); + String getTableName(); + Long getTableId(); + List<ColumnDefinition> getColumns(); + ColumnDefinition getColumnByIndex(int i); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java new file mode 100644 index 0000000..d59306b --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * A POJO for holding table information related to update events. + */ +public class TableInfo { + + final static String DB_TABLE_NAME_DELIMITER = "@!@"; + + private String databaseName; + private String tableName; + private Long tableId; + private List<ColumnDefinition> columns; + + public TableInfo(String databaseName, String tableName, Long tableId, List<ColumnDefinition> columns) { + this.databaseName = databaseName; + this.tableName = tableName; + this.tableId = tableId; + this.columns = columns; + } + + public String getDatabaseName() { + return databaseName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public Long getTableId() { + return tableId; + } + + public List<ColumnDefinition> getColumns() { + return columns; + } + + public void setColumns(List<ColumnDefinition> columns) { + this.columns = columns; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TableInfo that = (TableInfo) o; + + return new EqualsBuilder() + .append(databaseName, that.databaseName) + .append(tableName, that.tableName) + .append(tableId, that.tableId) + .append(columns, that.columns) + .isEquals(); + } + + @Override + public int hashCode() { + int result = databaseName.hashCode(); + result = 31 * result + tableName.hashCode(); + result = 31 * result + tableId.hashCode(); + result = 31 * result + (columns != null ? columns.hashCode() : 0); + return result; + } + + public static class Serializer implements org.apache.nifi.distributed.cache.client.Serializer<TableInfo> { + + @Override + public void serialize(TableInfo value, OutputStream output) throws SerializationException, IOException { + StringBuilder sb = new StringBuilder(value.getDatabaseName()); + sb.append(DB_TABLE_NAME_DELIMITER); + sb.append(value.getTableName()); + sb.append(DB_TABLE_NAME_DELIMITER); + sb.append(value.getTableId()); + List<ColumnDefinition> columnDefinitions = value.getColumns(); + if (columnDefinitions != null && !columnDefinitions.isEmpty()) { + sb.append(DB_TABLE_NAME_DELIMITER); + sb.append(columnDefinitions.stream().map((col) -> col.getName() + DB_TABLE_NAME_DELIMITER + col.getType()).collect(Collectors.joining(DB_TABLE_NAME_DELIMITER))); + } + output.write(sb.toString().getBytes()); + } + } + + public static class Deserializer implements org.apache.nifi.distributed.cache.client.Deserializer<TableInfo> { + + @Override + public TableInfo deserialize(byte[] input) throws DeserializationException, IOException { + // Don't bother deserializing if empty, just return null. This usually happens when the key is not found in the cache + if (input == null || input.length == 0) { + return null; + } + String inputString = new String(input); + String[] tokens = inputString.split(DB_TABLE_NAME_DELIMITER); + int numTokens = tokens.length; + if (numTokens < 3) { + throw new IOException("Could not deserialize TableInfo from the following value: " + inputString); + } + String dbName = tokens[0]; + String tableName = tokens[1]; + Long tableId; + try { + tableId = Long.parseLong(tokens[2]); + } catch (NumberFormatException nfe) { + throw new IOException("Illegal table ID: " + tokens[2]); + } + // Parse column names and types + List<ColumnDefinition> columnDefinitions = new ArrayList<>(); + for (int i = 0; i < numTokens - 3; i += 2) { + try { + int columnTypeIndex = i + 4; + int columnNameIndex = i + 3; + if (columnTypeIndex < numTokens) { + columnDefinitions.add(new ColumnDefinition(Integer.parseInt(tokens[columnTypeIndex]), tokens[columnNameIndex])); + } else { + throw new IOException("No type detected for column: " + tokens[columnNameIndex]); + } + } catch (NumberFormatException nfe) { + throw new IOException("Illegal column type value for column " + (i / 2 + 1) + ": " + tokens[i + 4]); + } + } + + return new TableInfo(dbName, tableName, tableId, columnDefinitions); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java new file mode 100644 index 0000000..2f185e9 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; + +import java.io.IOException; +import java.io.OutputStream; + +import static org.apache.nifi.cdc.event.TableInfo.DB_TABLE_NAME_DELIMITER; + +/** + * This class represents a key in a cache that contains information (column definitions, e.g.) for a database table + */ +public class TableInfoCacheKey { + + private final String databaseName; + private final String tableName; + private final long tableId; + private final String uuidPrefix; + + public TableInfoCacheKey(String uuidPrefix, String databaseName, String tableName, long tableId) { + this.uuidPrefix = uuidPrefix; + this.databaseName = databaseName; + this.tableName = tableName; + this.tableId = tableId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TableInfoCacheKey that = (TableInfoCacheKey) o; + + return new EqualsBuilder() + .append(tableId, that.tableId) + .append(databaseName, that.databaseName) + .append(tableName, that.tableName) + .append(uuidPrefix, that.uuidPrefix) + .isEquals(); + } + + @Override + public int hashCode() { + int result = databaseName != null ? databaseName.hashCode() : 0; + result = 31 * result + (tableName != null ? tableName.hashCode() : 0); + result = 31 * result + (int) (tableId ^ (tableId >>> 32)); + result = 31 * result + (uuidPrefix != null ? uuidPrefix.hashCode() : 0); + return result; + } + + public String getDatabaseName() { + return databaseName; + } + + public String getTableName() { + return tableName; + } + + public long getTableId() { + return tableId; + } + + public String getUuidPrefix() { + return uuidPrefix; + } + + public static class Serializer implements org.apache.nifi.distributed.cache.client.Serializer<TableInfoCacheKey> { + + @Override + public void serialize(TableInfoCacheKey key, OutputStream output) throws SerializationException, IOException { + StringBuilder sb = new StringBuilder(key.getUuidPrefix()); + sb.append(DB_TABLE_NAME_DELIMITER); + sb.append(key.getDatabaseName()); + sb.append(DB_TABLE_NAME_DELIMITER); + sb.append(key.getTableName()); + sb.append(DB_TABLE_NAME_DELIMITER); + sb.append(key.getTableId()); + output.write(sb.toString().getBytes()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java new file mode 100644 index 0000000..c805d3c --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event.io; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.nifi.cdc.event.EventInfo; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * An abstract class that outputs common information (event type, timestamp, e.g.) about CDC events. + */ +public abstract class AbstractEventWriter<T extends EventInfo> implements EventWriter<T> { + + private final JsonFactory JSON_FACTORY = new JsonFactory(); + protected JsonGenerator jsonGenerator; + + // Common method to create a JSON generator and start the root object. Should be called by sub-classes unless they need their own generator and such. + protected void startJson(OutputStream outputStream, T event) throws IOException { + jsonGenerator = createJsonGenerator(outputStream); + jsonGenerator.writeStartObject(); + String eventType = event.getEventType(); + if (eventType == null) { + jsonGenerator.writeNullField("type"); + } else { + jsonGenerator.writeStringField("type", eventType); + } + Long timestamp = event.getTimestamp(); + if (timestamp == null) { + jsonGenerator.writeNullField("timestamp"); + } else { + jsonGenerator.writeNumberField("timestamp", event.getTimestamp()); + } + } + + protected void endJson() throws IOException { + if (jsonGenerator == null) { + throw new IOException("endJson called without a JsonGenerator"); + } + jsonGenerator.writeEndObject(); + jsonGenerator.flush(); + jsonGenerator.close(); + } + + private JsonGenerator createJsonGenerator(OutputStream out) throws IOException { + return JSON_FACTORY.createGenerator(out); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java new file mode 100644 index 0000000..096e3c1 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event.io; + +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.cdc.event.EventInfo; +import org.apache.nifi.processor.Relationship; + +/** + * An interface to write an event to the process session. Note that a single event may produce multiple flow files. + */ +public interface EventWriter<T extends EventInfo> { + + String APPLICATION_JSON = "application/json"; + String SEQUENCE_ID_KEY = "cdc.sequence.id"; + String CDC_EVENT_TYPE_ATTRIBUTE = "cdc.event.type"; + + /** + * Writes the given event to the process session, possibly via transferring it to the specified relationship (usually used for success) + * + * @param session The session to write the event to + * @param transitUri The URI indicating the source MySQL system from which the specified event is associated + * @param eventInfo The event data + * @param currentSequenceId the current sequence ID + * @param relationship A relationship to transfer any flowfile(s) to + * @return a sequence ID, usually incremented from the specified current sequence id by the number of flow files transferred and/or committed + */ + long writeEvent(final ProcessSession session, String transitUri, final T eventInfo, final long currentSequenceId, Relationship relationship); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/pom.xml new file mode 100644 index 0000000..a42b49d --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/pom.xml @@ -0,0 +1,40 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cdc-mysql-bundle</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-cdc-mysql-nar</artifactId> + <packaging>nar</packaging> + <description>NiFi MySQL Change Data Capture (CDC) NAR</description> + <properties> + <maven.javadoc.skip>true</maven.javadoc.skip> + <source.skip>true</source.skip> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cdc-mysql-processors</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000..7d59e9d --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,5 @@ +nifi-cdc-mysql-nar +Copyright 2014-2017 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml new file mode 100644 index 0000000..13aea10 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml @@ -0,0 +1,67 @@ +<?xml version="1.0"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor +license agreements. See the NOTICE file distributed with this work for additional +information regarding copyright ownership. The ASF licenses this file to +You under the Apache License, Version 2.0 (the "License"); you may not use +this file except in compliance with the License. You may obtain a copy of +the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required +by applicable law or agreed to in writing, software distributed under the +License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS +OF ANY KIND, either express or implied. See the License for the specific +language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cdc-mysql-bundle</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-cdc-mysql-processors</artifactId> + <packaging>jar</packaging> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-cdc-api</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service-api</artifactId> + </dependency> + <dependency> + <groupId>com.github.shyiko</groupId> + <artifactId>mysql-binlog-connector-java</artifactId> + <version>0.11.0</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-distributed-cache-client-service</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service</artifactId> + <version>1.2.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java new file mode 100644 index 0000000..7089cda --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogEventInfo.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import org.apache.nifi.cdc.event.BaseEventInfo; + +/** + * A base class for all MYSQL binlog events + */ +public class BaseBinlogEventInfo extends BaseEventInfo implements BinlogEventInfo { + + private String binlogFilename; + private Long binlogPosition; + + public BaseBinlogEventInfo(String eventType, Long timestamp, String binlogFilename, Long binlogPosition) { + super(eventType, timestamp); + this.binlogFilename = binlogFilename; + this.binlogPosition = binlogPosition; + } + + public String getBinlogFilename() { + return binlogFilename; + } + + public Long getBinlogPosition() { + return binlogPosition; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java new file mode 100644 index 0000000..4796947 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogRowEventInfo.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import org.apache.nifi.cdc.event.BaseRowEventInfo; +import org.apache.nifi.cdc.event.RowEventInfo; +import org.apache.nifi.cdc.event.TableInfo; + +import java.util.BitSet; +import java.util.List; + +/** + * A base class to help store information about a row mutation event (UPDATE, DELETE, etc.) + */ +public class BaseBinlogRowEventInfo<RowEventDataType> extends BaseBinlogTableEventInfo implements RowEventInfo<RowEventDataType> { + + private BitSet includedColumns; + private RowEventInfo<RowEventDataType> delegate; + + public BaseBinlogRowEventInfo(TableInfo tableInfo, String type, Long timestamp, String binlogFilename, Long binlogPosition, BitSet includedColumns, List<RowEventDataType> rows) { + super(tableInfo, type, timestamp, binlogFilename, binlogPosition); + this.includedColumns = includedColumns; + this.delegate = new BaseRowEventInfo<>(tableInfo, type, timestamp, rows); + } + + public BitSet getIncludedColumns() { + return includedColumns; + } + + @Override + public List<RowEventDataType> getRows() { + return delegate.getRows(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java new file mode 100644 index 0000000..763d695 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import org.apache.nifi.cdc.event.BaseTableEventInfo; +import org.apache.nifi.cdc.event.ColumnDefinition; +import org.apache.nifi.cdc.event.TableEventInfo; +import org.apache.nifi.cdc.event.TableInfo; + +import java.util.List; + +/** + * A base class to handle data common to binlog table events, such as database name, table name, etc. + */ +public class BaseBinlogTableEventInfo extends BaseBinlogEventInfo implements BinlogTableEventInfo { + + private TableEventInfo delegate; + + public BaseBinlogTableEventInfo(TableInfo tableInfo, String eventType, Long timestamp, String binlogFilename, Long binlogPosition) { + super(eventType, timestamp, binlogFilename, binlogPosition); + this.delegate = new BaseTableEventInfo(tableInfo, SCHEMA_CHANGE, timestamp); + } + + @Override + public String getDatabaseName() { + return delegate.getDatabaseName(); + } + + @Override + public String getTableName() { + return delegate.getTableName(); + } + + @Override + public Long getTableId() { + return delegate.getTableId(); + } + + @Override + public List<ColumnDefinition> getColumns() { + return delegate.getColumns(); + } + + @Override + public ColumnDefinition getColumnByIndex(int i) { + return delegate.getColumnByIndex(i); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java new file mode 100644 index 0000000..adf96be --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BeginTransactionEventInfo.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +/** + * An event implementation corresponding to the beginning of a MySQL transaction (update rows, e.g.) + */ +public class BeginTransactionEventInfo extends BaseBinlogEventInfo { + + public BeginTransactionEventInfo(Long timestamp, String binlogFilename, long binlogPosition) { + super(BEGIN_EVENT, timestamp, binlogFilename, binlogPosition); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java new file mode 100644 index 0000000..6398d4f --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventInfo.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import org.apache.nifi.cdc.event.EventInfo; + +/** + * An interface for MYSQL binlog-specific events. + */ +public interface BinlogEventInfo extends EventInfo { + + String BINLOG_FILENAME_KEY = "binlog.filename"; + String BINLOG_POSITION_KEY = "binlog.position"; + + String getBinlogFilename(); + + Long getBinlogPosition(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java new file mode 100644 index 0000000..343cced --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; +import com.github.shyiko.mysql.binlog.event.Event; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * An event listener wrapper for MYSQL binlog events generated from the mysql-binlog-connector. + */ +public class BinlogEventListener implements BinaryLogClient.EventListener { + + protected final AtomicBoolean stopNow = new AtomicBoolean(false); + private static final int QUEUE_OFFER_TIMEOUT_MSEC = 100; + + private final BlockingQueue<RawBinlogEvent> queue; + private final BinaryLogClient client; + + public BinlogEventListener(BinaryLogClient client, BlockingQueue<RawBinlogEvent> q) { + this.client = client; + this.queue = q; + } + + public void start() { + stopNow.set(false); + } + + public void stop() { + stopNow.set(true); + } + + @Override + public void onEvent(Event event) { + while (!stopNow.get()) { + RawBinlogEvent ep = new RawBinlogEvent(event, client.getBinlogFilename()); + try { + if (queue.offer(ep, QUEUE_OFFER_TIMEOUT_MSEC, TimeUnit.MILLISECONDS)) { + return; + } else { + throw new RuntimeException("Unable to add event to the queue"); + } + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while adding event to the queue"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogLifecycleListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogLifecycleListener.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogLifecycleListener.java new file mode 100644 index 0000000..50796e4 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogLifecycleListener.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import com.github.shyiko.mysql.binlog.BinaryLogClient; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * An listener wrapper for mysql-binlog-connector lifecycle events. + */ +public class BinlogLifecycleListener implements BinaryLogClient.LifecycleListener { + + AtomicReference<BinaryLogClient> client = new AtomicReference<>(null); + AtomicReference<Exception> exception = new AtomicReference<>(null); + + @Override + public void onConnect(BinaryLogClient binaryLogClient) { + client.set(binaryLogClient); + exception.set(null); + } + + @Override + public void onCommunicationFailure(BinaryLogClient binaryLogClient, Exception e) { + client.set(binaryLogClient); + exception.set(e); + } + + @Override + public void onEventDeserializationFailure(BinaryLogClient binaryLogClient, Exception e) { + client.set(binaryLogClient); + exception.set(e); + } + + @Override + public void onDisconnect(BinaryLogClient binaryLogClient) { + client.set(binaryLogClient); + } + + public BinaryLogClient getClient() { + return client.get(); + } + + public Exception getException() { + return exception.get(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogTableEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogTableEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogTableEventInfo.java new file mode 100644 index 0000000..43a50ad --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogTableEventInfo.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import org.apache.nifi.cdc.event.TableEventInfo; + +/** + * A marker interface for those classes wishing to implement binlog-specific methods as well as table-generic methods (getDatabase, e.g.) + */ +public interface BinlogTableEventInfo extends BinlogEventInfo, TableEventInfo { +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java new file mode 100644 index 0000000..96a84d3 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/CommitTransactionEventInfo.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + + +/** + * An event implementation corresponding to the beginning of a MySQL transaction (update rows, e.g.) + */ +public class CommitTransactionEventInfo extends BaseBinlogEventInfo { + + public CommitTransactionEventInfo(Long timestamp, String binlogFilename, long binlogPosition) { + super(COMMIT_EVENT, timestamp, binlogFilename, binlogPosition); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java new file mode 100644 index 0000000..3b538ec --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DeleteRowsEventInfo.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import org.apache.nifi.cdc.event.TableInfo; + +import java.io.Serializable; + +/** + * This class represents information about rows deleted from a MySQL table + */ +public class DeleteRowsEventInfo extends BaseBinlogRowEventInfo<Serializable[]> { + + public DeleteRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, Long binlogPosition, DeleteRowsEventData data) { + super(tableInfo, DELETE_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java new file mode 100644 index 0000000..0dd7f12 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/InsertRowsEventInfo.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import org.apache.nifi.cdc.event.TableInfo; + +import java.io.Serializable; + +/** + * This class represents information about rows written/added to a MySQL table + */ +public class InsertRowsEventInfo extends BaseBinlogRowEventInfo<Serializable[]> { + + private WriteRowsEventData data; + + public InsertRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, Long binlogPosition, WriteRowsEventData data) { + super(tableInfo, WRITE_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows()); + this.data = data; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java new file mode 100644 index 0000000..313bfce --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import java.io.Serializable; + +/** + * A utility class to provide MySQL- / binlog-specific constants and methods for processing events and data + */ +public class MySQLCDCUtils { + + public static Object getWritableObject(Integer type, Serializable value) { + if (value == null) { + return null; + } + if (type == null) { + if (value instanceof byte[]) { + return new String((byte[]) value); + } else if (value instanceof Number) { + return value; + } + } else if (value instanceof Number) { + return value; + } else { + if (value instanceof byte[]) { + return new String((byte[]) value); + } else { + return value.toString(); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/RawBinlogEvent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/RawBinlogEvent.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/RawBinlogEvent.java new file mode 100644 index 0000000..c3ccabf --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/RawBinlogEvent.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import com.github.shyiko.mysql.binlog.event.Event; + +/** + * An object holder for raw binlog events + */ +public class RawBinlogEvent { + + private Event event; + private String binlogFilename; + + public RawBinlogEvent(Event event, String binlogFilename) { + this.event = event; + this.binlogFilename = binlogFilename; + } + + public Event getEvent() { + return event; + } + + public void setEvent(Event event) { + this.event = event; + } + + public String getBinlogFilename() { + return binlogFilename; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java new file mode 100644 index 0000000..a385b11 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import org.apache.nifi.cdc.event.TableEventInfo; +import org.apache.nifi.cdc.event.TableInfo; + + +/** + * An event class corresponding to table schema changes (add/drop column, add/drop table, etc.) + */ +public class SchemaChangeEventInfo extends BaseBinlogTableEventInfo implements TableEventInfo { + + private String query; + + public SchemaChangeEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, long binlogPosition, String query) { + super(tableInfo, SCHEMA_CHANGE, timestamp, binlogFilename, binlogPosition); + this.query = query; + } + + public String getQuery() { + return query; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/8f37ad45/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java new file mode 100644 index 0000000..2e20dc6 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/UpdateRowsEventInfo.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import org.apache.nifi.cdc.event.TableInfo; + +import java.io.Serializable; +import java.util.BitSet; +import java.util.Map; + +/** + * This class represents information about rows written/added to a MySQL table + */ +public class UpdateRowsEventInfo extends BaseBinlogRowEventInfo<Map.Entry<Serializable[], Serializable[]>> { + + private BitSet includedColumnsBeforeUpdate; + + public UpdateRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, Long binlogPosition, UpdateRowsEventData data) { + super(tableInfo, UPDATE_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows()); + includedColumnsBeforeUpdate = data.getIncludedColumnsBeforeUpdate(); + } + + public BitSet getIncludedColumnsBeforeUpdate() { + return includedColumnsBeforeUpdate; + } +}
