[
https://issues.apache.org/jira/browse/NIFI-3413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958844#comment-15958844
]
ASF GitHub Bot commented on NIFI-3413:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1618#discussion_r110150485
--- Diff:
nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java
---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cdc.mysql.event.io;
+
+import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.cdc.event.io.AbstractEventWriter;
+import org.apache.nifi.processor.Relationship;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An abstract base class for writing MYSQL binlog events into flow
file(s), e.g.
+ */
+public abstract class AbstractBinlogEventWriter<T extends BinlogEventInfo>
extends AbstractEventWriter<T> {
+
+ protected void writeJson(T event) throws IOException {
+ jsonGenerator.writeStringField("binlog_filename",
event.getBinlogFilename());
+ jsonGenerator.writeNumberField("binlog_position",
event.getBinlogPosition());
+ }
+
+ protected Map<String, String> getCommonAttributes(final long
sequenceId, BinlogEventInfo eventInfo) {
+ return new HashMap<String, String>() {
+ {
+ put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
+ put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
+ put(BinlogEventInfo.BINLOG_FILENAME_KEY,
eventInfo.getBinlogFilename());
+ put(BinlogEventInfo.BINLOG_POSITION_KEY,
Long.toString(eventInfo.getBinlogPosition()));
+ put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
+ }
+ };
+ }
+
+ // Default implementation for binlog events
+ @Override
+ public long writeEvent(ProcessSession session, T eventInfo, long
currentSequenceId, Relationship relationship) {
+ FlowFile flowFile = session.create();
--- End diff --
Argh, I forgot this one, sorry! Will add it
> Implement a CaptureChangeMySQL processor
> ----------------------------------------
>
> Key: NIFI-3413
> URL: https://issues.apache.org/jira/browse/NIFI-3413
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
> Fix For: 1.2.0
>
>
> Database systems such as MySQL, Oracle, and SQL Server allow access to their
> transactional logs and such, in order for external clients to have a "change
> data capture" (CDC) capability. As an initial effort, I propose a
> CaptureChangeMySQL processor to enable this in NiFi. This would incorporate
> any APIs necessary for follow-on Jira cases to implement CDC processors for
> databases such as Oracle, SQL Server, PostgreSQL, etc.
> The processor would include properties needed for database connectivity
> (unless using a DBCPConnectionPool would suffice), as well as any to
> configure third-party clients (mysql-binlog-connector, e.g.). It would also
> need to keep a "sequence ID" such that an EnforceOrder processor (NIFI-3414)
> for example could guarantee the order of CDC events for use cases such as
> replication. It will likely need State Management for that, and may need
> other facilities such as a DistributedMapCache in order to keep information
> (column names and types, e.g.) that enrich the raw CDC events.
> The processor would accept no incoming connections (it is a "get" or source
> processor), would be intended to run on the primary node only as a single
> threaded processor, and would generate a flow file for each operation
> (INSERT, UPDATE, DELETE, e,g,) in one or some number of formats (JSON, e.g.).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)