[
https://issues.apache.org/jira/browse/NIFI-3413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941188#comment-15941188
]
ASF GitHub Bot commented on NIFI-3413:
--------------------------------------
Github user phrocker commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1618#discussion_r107917776
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/db/impl/mysql/event/io/InsertRowsWriter.java
---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.db.impl.mysql.event.io;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.standard.GetChangeDataCaptureMySQL;
+import org.apache.nifi.processors.standard.db.event.ColumnDefinition;
+import
org.apache.nifi.processors.standard.db.impl.mysql.event.InsertRowsEventInfo;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.BitSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static
org.apache.nifi.processors.standard.db.impl.mysql.MySQLCDCUtils.getWritableObject;
+
+/**
+ * A writer class to output MySQL binlog "write rows" (aka INSERT) events
to flow file(s).
+ */
+public class InsertRowsWriter extends
AbstractBinlogTableEventWriter<InsertRowsEventInfo> {
+
+ /**
+ * Creates and transfers a new flow file whose contents are the
JSON-serialized value of the specified event, and the sequence ID attribute set
+ *
+ * @param session A reference to a ProcessSession from which the
flow file(s) will be created and transferred
+ * @param eventInfo An event whose value will become the contents of
the flow file
+ * @return The next available CDC sequence ID for use by the CDC
processor
+ */
+ public long writeEvent(final ProcessSession session, final
InsertRowsEventInfo eventInfo, final long currentSequenceId) {
+ final AtomicLong seqId = new AtomicLong(currentSequenceId);
+ for (Serializable[] row : eventInfo.getRows()) {
+
+ FlowFile flowFile = session.create();
+ flowFile = session.write(flowFile, outputStream -> {
+
+ super.startJson(outputStream, eventInfo);
+ super.writeJson(eventInfo);
+
+ final BitSet bitSet = eventInfo.getIncludedColumns();
+ writeRow(eventInfo, row, bitSet);
+
+ super.endJson();
+ });
+
+ flowFile = session.putAllAttributes(flowFile,
getCommonAttributes(seqId.get(), eventInfo));
+ session.transfer(flowFile,
GetChangeDataCaptureMySQL.REL_SUCCESS);
+ seqId.getAndIncrement();
+ }
+ return seqId.get();
+ }
+
+ protected void writeRow(InsertRowsEventInfo event, Serializable[] row,
BitSet includedColumns) throws IOException {
+ jsonGenerator.writeArrayFieldStart("columns");
+ int i = includedColumns.nextSetBit(0);
+ while (i != -1) {
+ jsonGenerator.writeStartObject();
+ jsonGenerator.writeNumberField("id", i + 1);
+ ColumnDefinition columnDefinition = event.getColumnByIndex(i);
+ Integer columnType = null;
+ if (columnDefinition != null) {
+ jsonGenerator.writeStringField("name",
columnDefinition.getName());
+ columnType = (int) columnDefinition.getType();
--- End diff --
I left a comment earlier about this. Could we avoid the autoboxing and make
make type a Long instead of an enumeration?
> Implement a GetChangeDataCapture processor
> ------------------------------------------
>
> Key: NIFI-3413
> URL: https://issues.apache.org/jira/browse/NIFI-3413
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Matt Burgess
> Assignee: Matt Burgess
>
> Database systems such as MySQL, Oracle, and SQL Server allow access to their
> transactional logs and such, in order for external clients to have a "change
> data capture" (CDC) capability. I propose a GetChangeDataCapture processor to
> enable this in NiFi.
> The processor would be configured with a DBCPConnectionPool controller
> service, as well as a Database Type property (similar to the one in
> QueryDatabaseTable) for database-specific handling. Additional properties
> might include the CDC table name, etc. Additional database-specific
> properties could be handled using dynamic properties (and the documentation
> should reflect this).
> The processor would accept no incoming connections (it is a "Get" or source
> processor), would be intended to run on the primary node only as a single
> threaded processor, and would generate a flow file for each operation
> (INSERT, UPDATE, DELETE, e,g,) in one or some number of formats (JSON, e.g.).
> The flow files would be transferred in time order (to enable a replication
> solution, for example), perhaps with some auto-incrementing attribute to also
> indicate order if need be.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)