[
https://issues.apache.org/jira/browse/NIFI-3413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943366#comment-15943366
]
ASF GitHub Bot commented on NIFI-3413:
--------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1618#discussion_r108184785
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/mysql/event/io/AbstractBinlogTableEventWriter.java
---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl.mysql.event.io;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.standard.GetChangeDataCaptureMySQL;
+import
org.apache.nifi.processors.standard.db.impl.mysql.event.BinlogTableEventInfo;
+
+import java.io.IOException;
+
+/**
+ * An abstract base class for writing MYSQL binlog events into flow
file(s), e.g.
+ */
+public abstract class AbstractBinlogTableEventWriter<T extends
BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {
+
+ protected void writeJson(T event) throws IOException {
+ super.writeJson(event);
+ if (event.getDatabaseName() != null) {
+ jsonGenerator.writeStringField("database",
event.getDatabaseName());
+ } else {
+ jsonGenerator.writeNullField("database");
+ }
+ if (event.getTableName() != null) {
+ jsonGenerator.writeStringField("table_name",
event.getTableName());
+ } else {
+ jsonGenerator.writeNullField("table_name");
+ }
+ if (event.getTableId() != null) {
+ jsonGenerator.writeNumberField("table_id", event.getTableId());
+ } else {
+ jsonGenerator.writeNullField("table_id");
+ }
+ }
+
+ // Default implementation for binlog events
+ @Override
+ public long writeEvent(ProcessSession session, T eventInfo, long
currentSequenceId) {
+ FlowFile flowFile = session.create();
+ flowFile = session.write(flowFile, (outputStream) -> {
+ super.startJson(outputStream, eventInfo);
+ super.writeJson(eventInfo);
--- End diff --
Good catch! Strangely I didn't see it manifest itself in the unit tests
(multiple fields, e.g.), perhaps the JSON generator keeps a Map at each level
and they just kept getting overwritten. In any case, I will remove it (and
update the comments for this class to include "table-related" events so as not
to match the same comments from its parent class)
> Implement a GetChangeDataCapture processor
> ------------------------------------------
>
> Key: NIFI-3413
> URL: https://issues.apache.org/jira/browse/NIFI-3413
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
>
> Database systems such as MySQL, Oracle, and SQL Server allow access to their
> transactional logs and such, in order for external clients to have a "change
> data capture" (CDC) capability. I propose a GetChangeDataCapture processor to
> enable this in NiFi.
> The processor would be configured with a DBCPConnectionPool controller
> service, as well as a Database Type property (similar to the one in
> QueryDatabaseTable) for database-specific handling. Additional properties
> might include the CDC table name, etc. Additional database-specific
> properties could be handled using dynamic properties (and the documentation
> should reflect this).
> The processor would accept no incoming connections (it is a "Get" or source
> processor), would be intended to run on the primary node only as a single
> threaded processor, and would generate a flow file for each operation
> (INSERT, UPDATE, DELETE, e,g,) in one or some number of formats (JSON, e.g.).
> The flow files would be transferred in time order (to enable a replication
> solution, for example), perhaps with some auto-incrementing attribute to also
> indicate order if need be.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)