[
https://issues.apache.org/jira/browse/NIFI-7785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195173#comment-17195173
]
Roberto Garcia edited comment on NIFI-7785 at 9/14/20, 2:53 PM:
----------------------------------------------------------------
Table Structure
{code:java}
CREATE TABLE `sample` (
`id` int NOT NULL AUTO_INCREMENT,
`fruit` enum('apple','pear','orange') NOT NULL,
`price` int NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_0900_ai_ci;
{code}
I use an "ExecuteGroovyScript" Processor
I added two properties
||Property||Value||Comment||
|DBCPConnectionPoolName|SampleMySQLPool|this is your DBCPConnectionPool 1.12.0
Database Connection URL : jdbc:mysql://localhost:3306/sample
Database Driver Class Name: com.mysql.cj.jdbc.Driver
Database Driver Location(s) : /usr/share/java/mysql-connector-java-8.0.21.jar|
|sqlCmd|SELECT
REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\'','')
enums
FROM INFORMATION_SCHEMA.COLUMNS
table_name='sample' AND column_name='fruit'|this query get your enums from
your column "fruit"|
finally here my Groovy Script:
{code:java}
import java.util.regex.Matcher
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
// Executescript attributes
def serviceName = DBCPConnectionPoolName.value
def sqlCmdString = sqlCmd.value
// get controller service lookup from context
def lookup = context.controllerServiceLookup
// search for serviceName in controller services
def dbcpServiceId =
lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == serviceName
}
//Get the service from serviceid
def service = lookup.getControllerService(dbcpServiceId)
// Connect to service
def conn = service.getConnection()
if (!conn) {
log.error( "Failed to connect to " + serviceName)
return;
}
try {
flowFile = session.get()
flowFile = session.write(flowFile, { inputStream, outputStream ->
def content =
org.apache.commons.io.IOUtils.toString(inputStream,
java.nio.charset.StandardCharsets.UTF_8)
content=content.trim()
def sql = new Sql(conn)
if (!sql) {
log.error( "Failed to get SQL connection")
return
}
def row = sql.firstRow(sqlCmdString)
enums="${row.enums}".trim()
def enumArr = enums.split(',')
String Index=""
Matcher regexMatcher = content =~ /(?s)(?<="id":2,"value":)(\d+?)(?=})/
if (regexMatcher.find()) {
index = regexMatcher.group(1)
}
def enumText= enumArr[index.toInteger() -1]
String cdcConverted =
content.replaceAll(/(?s)(?<="id":2,"value":)(\d+?)(?=})/, "\"" + enumText +
"\"");
outputStream.write((cdcConverted).getBytes("UTF-8"))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Scripting error' + sqlCmd, e)
session.transfer(flowFile, REL_FAILURE)
}
// Release connection, this is important as it will otherwise block new
executions
conn?.close()
{code}
*Input:*
{code:java}
{"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[
{"id":1,"value":57}
,{"id":2,"value":2},{"id":3,"value":300}]}
{code}
*Output:*
{code:java}
{"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[
{"id":1,"value":57}
,{"id":2,"value":"pear"},{"id":3,"value":300}]}
{code}
was (Author: robertogarcia):
I use an "ExecuteGroovyScript" Processor
I added two properties
||Property||Value||Comment||
|DBCPConnectionPoolName|SampleMySQLPool|this is your DBCPConnectionPool 1.12.0
Database Connection URL : jdbc:mysql://localhost:3306/sample
Database Driver Class Name: com.mysql.cj.jdbc.Driver
Database Driver Location(s) : /usr/share/java/mysql-connector-java-8.0.21.jar|
|sqlCmd|SELECT
REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\'','')
enums
FROM INFORMATION_SCHEMA.COLUMNS
table_name='sample' AND column_name='fruit'|this query get your enums from
your column "fruit"|
finally here my Groovy Script:
{code:java}
import java.util.regex.Matcher
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql
// Executescript attributes
def serviceName = DBCPConnectionPoolName.value
def sqlCmdString = sqlCmd.value
// get controller service lookup from context
def lookup = context.controllerServiceLookup
// search for serviceName in controller services
def dbcpServiceId =
lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == serviceName
}
//Get the service from serviceid
def service = lookup.getControllerService(dbcpServiceId)
// Connect to service
def conn = service.getConnection()
if (!conn) {
log.error( "Failed to connect to " + serviceName)
return;
}
try {
flowFile = session.get()
flowFile = session.write(flowFile, { inputStream, outputStream ->
def content =
org.apache.commons.io.IOUtils.toString(inputStream,
java.nio.charset.StandardCharsets.UTF_8)
content=content.trim()
def sql = new Sql(conn)
if (!sql) {
log.error( "Failed to get SQL connection")
return
}
def row = sql.firstRow(sqlCmdString)
enums="${row.enums}".trim()
def enumArr = enums.split(',')
String Index=""
Matcher regexMatcher = content =~ /(?s)(?<="id":2,"value":)(\d+?)(?=})/
if (regexMatcher.find()) {
index = regexMatcher.group(1)
}
def enumText= enumArr[index.toInteger() -1]
String cdcConverted =
content.replaceAll(/(?s)(?<="id":2,"value":)(\d+?)(?=})/, "\"" + enumText +
"\"");
outputStream.write((cdcConverted).getBytes("UTF-8"))
} as StreamCallback)
session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
log.error('Scripting error' + sqlCmd, e)
session.transfer(flowFile, REL_FAILURE)
}
// Release connection, this is important as it will otherwise block new
executions
conn?.close()
{code}
*Input:*
{code:java}
{"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[
{"id":1,"value":57}
,{"id":2,"value":2},{"id":3,"value":300}]}
{code}
*Output:*
{code:java}
{"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[
{"id":1,"value":57}
,{"id":2,"value":"pear"},{"id":3,"value":300}]}
{code}
> CaptureChangeMySQL processor captures enum values as "INDEX of those values"
> from Mysql DB"
> -------------------------------------------------------------------------------------------
>
> Key: NIFI-7785
> URL: https://issues.apache.org/jira/browse/NIFI-7785
> Project: Apache NiFi
> Issue Type: Bug
> Components: Tools and Build
> Affects Versions: 1.11.4
> Environment: Ubuntu EC2 instance with 8 GB ram
> Reporter: zeyk
> Priority: Major
> Labels: features
> Attachments: flow.xml.gz
>
>
> CaptureChangeMySQL processor captures enum values as "INDEX of those values"
> rather than the values specified.
> for example:
> A table has columns (id int, fruit enum ('apple','pears','orange'), price int)
> On doing an insert:
> insert into (1,'apple',45)
> insert into (2,'pears',56)
> I have used CaptureChangeMySql processor to capture the CDC changes, the
> process does the capture but captures the enum column alone based on its
> index like the sample below:
> for 1st insert:
>
> {
> "type":"insert",
> "timestamp":1599004442000,
> "binlog_filename":"mysql-bin-changelog.000039",
> "binlog_position":1537835,
> "database":"sample",
> "table_name":"sample",
> "table_id":82,
> "columns":[
> {
> "id":1,
> "name":"id",
> "column_type":-5,
> "value":139
> },
> {
> "id":2,
> "name":"fruit",
> "column_type":12,
> "value":0
> },
> {
> "id":3,
> "name":"price",
> "column_type":12,
> "value":45
> }
> ]
> }
>
> for 2nd insert:
>
> {
> "type":"insert",
> "timestamp":1599004442000,
> "binlog_filename":"mysql-bin-changelog.000039",
> "binlog_position":1537835,
> "database":"sample",
> "table_name":"sample",
> "table_id":82,
> "columns":[
> {
> "id":1,
> "name":"id",
> "column_type":-5,
> "value":139
> },
> {
> "id":2,
> "name":"fruit",
> "column_type":12,
> "value":1
> },
> {
> "id":3,
> "name":"price",
> "column_type":12,
> "value":56
> }
> ]
> }
>
>
> So the above has 0 and 1 in place of apple and pears respectively.
>
> Could you of you help me on this, if there are folks who have faced similar
> kinda issue
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)