[ https://issues.apache.org/jira/browse/NIFI-7785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195173#comment-17195173 ]
Roberto Garcia edited comment on NIFI-7785 at 9/14/20, 2:53 PM: ---------------------------------------------------------------- Table Structure {code:java} CREATE TABLE `sample` ( `id` int NOT NULL AUTO_INCREMENT, `fruit` enum('apple','pear','orange') NOT NULL, `price` int NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; {code} I use an "ExecuteGroovyScript" Processor I added two properties ||Property||Value||Comment|| |DBCPConnectionPoolName|SampleMySQLPool|this is your DBCPConnectionPool 1.12.0 Database Connection URL : jdbc:mysql://localhost:3306/sample Database Driver Class Name: com.mysql.cj.jdbc.Driver Database Driver Location(s) : /usr/share/java/mysql-connector-java-8.0.21.jar| |sqlCmd|SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\'','') enums FROM INFORMATION_SCHEMA.COLUMNS table_name='sample' AND column_name='fruit'|this query get your enums from your column "fruit"| finally here my Groovy Script: {code:java} import java.util.regex.Matcher import org.apache.nifi.controller.ControllerService import groovy.sql.Sql // Executescript attributes def serviceName = DBCPConnectionPoolName.value def sqlCmdString = sqlCmd.value // get controller service lookup from context def lookup = context.controllerServiceLookup // search for serviceName in controller services def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { cs -> lookup.getControllerServiceName(cs) == serviceName } //Get the service from serviceid def service = lookup.getControllerService(dbcpServiceId) // Connect to service def conn = service.getConnection() if (!conn) { log.error( "Failed to connect to " + serviceName) return; } try { flowFile = session.get() flowFile = session.write(flowFile, { inputStream, outputStream -> def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8) content=content.trim() def sql = new Sql(conn) if (!sql) { log.error( "Failed to get SQL connection") return } def row = sql.firstRow(sqlCmdString) enums="${row.enums}".trim() def enumArr = enums.split(',') String Index="" Matcher regexMatcher = content =~ /(?s)(?<="id":2,"value":)(\d+?)(?=})/ if (regexMatcher.find()) { index = regexMatcher.group(1) } def enumText= enumArr[index.toInteger() -1] String cdcConverted = content.replaceAll(/(?s)(?<="id":2,"value":)(\d+?)(?=})/, "\"" + enumText + "\""); outputStream.write((cdcConverted).getBytes("UTF-8")) } as StreamCallback) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('Scripting error' + sqlCmd, e) session.transfer(flowFile, REL_FAILURE) } // Release connection, this is important as it will otherwise block new executions conn?.close() {code} *Input:* {code:java} {"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[ {"id":1,"value":57} ,{"id":2,"value":2},{"id":3,"value":300}]} {code} *Output:* {code:java} {"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[ {"id":1,"value":57} ,{"id":2,"value":"pear"},{"id":3,"value":300}]} {code} was (Author: robertogarcia): I use an "ExecuteGroovyScript" Processor I added two properties ||Property||Value||Comment|| |DBCPConnectionPoolName|SampleMySQLPool|this is your DBCPConnectionPool 1.12.0 Database Connection URL : jdbc:mysql://localhost:3306/sample Database Driver Class Name: com.mysql.cj.jdbc.Driver Database Driver Location(s) : /usr/share/java/mysql-connector-java-8.0.21.jar| |sqlCmd|SELECT REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\'','') enums FROM INFORMATION_SCHEMA.COLUMNS table_name='sample' AND column_name='fruit'|this query get your enums from your column "fruit"| finally here my Groovy Script: {code:java} import java.util.regex.Matcher import org.apache.nifi.controller.ControllerService import groovy.sql.Sql // Executescript attributes def serviceName = DBCPConnectionPoolName.value def sqlCmdString = sqlCmd.value // get controller service lookup from context def lookup = context.controllerServiceLookup // search for serviceName in controller services def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { cs -> lookup.getControllerServiceName(cs) == serviceName } //Get the service from serviceid def service = lookup.getControllerService(dbcpServiceId) // Connect to service def conn = service.getConnection() if (!conn) { log.error( "Failed to connect to " + serviceName) return; } try { flowFile = session.get() flowFile = session.write(flowFile, { inputStream, outputStream -> def content = org.apache.commons.io.IOUtils.toString(inputStream, java.nio.charset.StandardCharsets.UTF_8) content=content.trim() def sql = new Sql(conn) if (!sql) { log.error( "Failed to get SQL connection") return } def row = sql.firstRow(sqlCmdString) enums="${row.enums}".trim() def enumArr = enums.split(',') String Index="" Matcher regexMatcher = content =~ /(?s)(?<="id":2,"value":)(\d+?)(?=})/ if (regexMatcher.find()) { index = regexMatcher.group(1) } def enumText= enumArr[index.toInteger() -1] String cdcConverted = content.replaceAll(/(?s)(?<="id":2,"value":)(\d+?)(?=})/, "\"" + enumText + "\""); outputStream.write((cdcConverted).getBytes("UTF-8")) } as StreamCallback) session.transfer(flowFile, REL_SUCCESS) } catch(e) { log.error('Scripting error' + sqlCmd, e) session.transfer(flowFile, REL_FAILURE) } // Release connection, this is important as it will otherwise block new executions conn?.close() {code} *Input:* {code:java} {"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[ {"id":1,"value":57} ,{"id":2,"value":2},{"id":3,"value":300}]} {code} *Output:* {code:java} {"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[ {"id":1,"value":57} ,{"id":2,"value":"pear"},{"id":3,"value":300}]} {code} > CaptureChangeMySQL processor captures enum values as "INDEX of those values" > from Mysql DB" > ------------------------------------------------------------------------------------------- > > Key: NIFI-7785 > URL: https://issues.apache.org/jira/browse/NIFI-7785 > Project: Apache NiFi > Issue Type: Bug > Components: Tools and Build > Affects Versions: 1.11.4 > Environment: Ubuntu EC2 instance with 8 GB ram > Reporter: zeyk > Priority: Major > Labels: features > Attachments: flow.xml.gz > > > CaptureChangeMySQL processor captures enum values as "INDEX of those values" > rather than the values specified. > for example: > A table has columns (id int, fruit enum ('apple','pears','orange'), price int) > On doing an insert: > insert into (1,'apple',45) > insert into (2,'pears',56) > I have used CaptureChangeMySql processor to capture the CDC changes, the > process does the capture but captures the enum column alone based on its > index like the sample below: > for 1st insert: > > { > "type":"insert", > "timestamp":1599004442000, > "binlog_filename":"mysql-bin-changelog.000039", > "binlog_position":1537835, > "database":"sample", > "table_name":"sample", > "table_id":82, > "columns":[ > { > "id":1, > "name":"id", > "column_type":-5, > "value":139 > }, > { > "id":2, > "name":"fruit", > "column_type":12, > "value":0 > }, > { > "id":3, > "name":"price", > "column_type":12, > "value":45 > } > ] > } > > for 2nd insert: > > { > "type":"insert", > "timestamp":1599004442000, > "binlog_filename":"mysql-bin-changelog.000039", > "binlog_position":1537835, > "database":"sample", > "table_name":"sample", > "table_id":82, > "columns":[ > { > "id":1, > "name":"id", > "column_type":-5, > "value":139 > }, > { > "id":2, > "name":"fruit", > "column_type":12, > "value":1 > }, > { > "id":3, > "name":"price", > "column_type":12, > "value":56 > } > ] > } > > > So the above has 0 and 1 in place of apple and pears respectively. > > Could you of you help me on this, if there are folks who have faced similar > kinda issue > > -- This message was sent by Atlassian Jira (v8.3.4#803005)