[ 
https://issues.apache.org/jira/browse/NIFI-7785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195173#comment-17195173
 ] 

Roberto Garcia edited comment on NIFI-7785 at 9/14/20, 2:53 PM:
----------------------------------------------------------------

Table Structure
{code:java}
CREATE TABLE `sample` (
  `id` int NOT NULL AUTO_INCREMENT,
  `fruit` enum('apple','pear','orange') NOT NULL,
  `price` int NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 
COLLATE=utf8mb4_0900_ai_ci;

{code}

I use an "ExecuteGroovyScript" Processor

I added two properties
||Property||Value||Comment||
|DBCPConnectionPoolName|SampleMySQLPool|this is your DBCPConnectionPool 1.12.0
  
 Database Connection URL : jdbc:mysql://localhost:3306/sample
  
 Database Driver Class Name: com.mysql.cj.jdbc.Driver
  
 Database Driver Location(s) : /usr/share/java/mysql-connector-java-8.0.21.jar|
|sqlCmd|SELECT 
REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\'','') 
enums
 FROM INFORMATION_SCHEMA.COLUMNS
 table_name='sample' AND column_name='fruit'|this query get your enums from 
your column "fruit"|

 
 finally here my Groovy Script:
{code:java}
import java.util.regex.Matcher
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql

// Executescript attributes
def serviceName = DBCPConnectionPoolName.value
def sqlCmdString = sqlCmd.value
// get controller service lookup from context
def lookup = context.controllerServiceLookup
// search for serviceName in controller services
def dbcpServiceId = 
lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == serviceName
}
//Get the service from serviceid
def service = lookup.getControllerService(dbcpServiceId)
// Connect to service
def conn = service.getConnection()
if (!conn) {
  log.error( "Failed to connect to " + serviceName)
  return;
}
try {
  flowFile = session.get()
  flowFile = session.write(flowFile, { inputStream, outputStream ->
                def content = 
org.apache.commons.io.IOUtils.toString(inputStream, 
java.nio.charset.StandardCharsets.UTF_8)
                content=content.trim()
        def sql = new Sql(conn)
        if (!sql) {
                log.error( "Failed to get SQL connection")
            return
        }
        def row = sql.firstRow(sqlCmdString)
        enums="${row.enums}".trim()
        def enumArr = enums.split(',')
        String Index=""        
        Matcher regexMatcher = content =~ /(?s)(?<="id":2,"value":)(\d+?)(?=})/
        if (regexMatcher.find()) {
             index = regexMatcher.group(1)
        } 
        def enumText= enumArr[index.toInteger() -1]             
        String cdcConverted = 
content.replaceAll(/(?s)(?<="id":2,"value":)(\d+?)(?=})/, "\"" + enumText + 
"\"");
        outputStream.write((cdcConverted).getBytes("UTF-8"))
        } as StreamCallback)
  session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Scripting error' + sqlCmd, e)
    session.transfer(flowFile, REL_FAILURE)
}
// Release connection, this is important as it will otherwise block new 
executions
conn?.close()
{code}
*Input:*  
{code:java}
{"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[
{"id":1,"value":57}
,{"id":2,"value":2},{"id":3,"value":300}]}
{code}
*Output:*
{code:java}
{"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[
{"id":1,"value":57}
,{"id":2,"value":"pear"},{"id":3,"value":300}]}
{code}
 

 


was (Author: robertogarcia):
I use an "ExecuteGroovyScript" Processor

I added two properties
||Property||Value||Comment||
|DBCPConnectionPoolName|SampleMySQLPool|this is your DBCPConnectionPool 1.12.0
  
 Database Connection URL : jdbc:mysql://localhost:3306/sample
  
 Database Driver Class Name: com.mysql.cj.jdbc.Driver
  
 Database Driver Location(s) : /usr/share/java/mysql-connector-java-8.0.21.jar|
|sqlCmd|SELECT 
REPLACE(REPLACE(REPLACE(REPLACE(column_type,'enum',''),')',''),'(',''),'\'','') 
enums
 FROM INFORMATION_SCHEMA.COLUMNS
 table_name='sample' AND column_name='fruit'|this query get your enums from 
your column "fruit"|

 
 finally here my Groovy Script:
{code:java}
import java.util.regex.Matcher
import org.apache.nifi.controller.ControllerService
import groovy.sql.Sql

// Executescript attributes
def serviceName = DBCPConnectionPoolName.value
def sqlCmdString = sqlCmd.value
// get controller service lookup from context
def lookup = context.controllerServiceLookup
// search for serviceName in controller services
def dbcpServiceId = 
lookup.getControllerServiceIdentifiers(ControllerService).find {
    cs -> lookup.getControllerServiceName(cs) == serviceName
}
//Get the service from serviceid
def service = lookup.getControllerService(dbcpServiceId)
// Connect to service
def conn = service.getConnection()
if (!conn) {
  log.error( "Failed to connect to " + serviceName)
  return;
}
try {
  flowFile = session.get()
  flowFile = session.write(flowFile, { inputStream, outputStream ->
                def content = 
org.apache.commons.io.IOUtils.toString(inputStream, 
java.nio.charset.StandardCharsets.UTF_8)
                content=content.trim()
        def sql = new Sql(conn)
        if (!sql) {
                log.error( "Failed to get SQL connection")
            return
        }
        def row = sql.firstRow(sqlCmdString)
        enums="${row.enums}".trim()
        def enumArr = enums.split(',')
        String Index=""        
        Matcher regexMatcher = content =~ /(?s)(?<="id":2,"value":)(\d+?)(?=})/
        if (regexMatcher.find()) {
             index = regexMatcher.group(1)
        } 
        def enumText= enumArr[index.toInteger() -1]             
        String cdcConverted = 
content.replaceAll(/(?s)(?<="id":2,"value":)(\d+?)(?=})/, "\"" + enumText + 
"\"");
        outputStream.write((cdcConverted).getBytes("UTF-8"))
        } as StreamCallback)
  session.transfer(flowFile, REL_SUCCESS)
} catch(e) {
    log.error('Scripting error' + sqlCmd, e)
    session.transfer(flowFile, REL_FAILURE)
}
// Release connection, this is important as it will otherwise block new 
executions
conn?.close()
{code}
*Input:*  
{code:java}
{"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[
{"id":1,"value":57}
,{"id":2,"value":2},{"id":3,"value":300}]}
{code}
*Output:*
{code:java}
{"type":"insert","timestamp":1600053851000,"binlog_filename":"binlog.000019","binlog_position":3402,"database":null,"table_name":null,"table_id":null,"columns":[
{"id":1,"value":57}
,{"id":2,"value":"pear"},{"id":3,"value":300}]}
{code}
 

 

> CaptureChangeMySQL processor captures enum values as "INDEX of those values" 
> from Mysql DB"
> -------------------------------------------------------------------------------------------
>
>                 Key: NIFI-7785
>                 URL: https://issues.apache.org/jira/browse/NIFI-7785
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Tools and Build
>    Affects Versions: 1.11.4
>         Environment: Ubuntu EC2 instance with 8 GB ram
>            Reporter: zeyk
>            Priority: Major
>              Labels: features
>         Attachments: flow.xml.gz
>
>
> CaptureChangeMySQL processor captures enum values as "INDEX of those values" 
> rather than the values specified.
> for example:
> A table has columns (id int, fruit enum ('apple','pears','orange'), price int)
> On doing an insert:
> insert into (1,'apple',45)
> insert into (2,'pears',56)
> I have used CaptureChangeMySql processor to capture the CDC changes, the 
> process does the capture but captures the enum column alone based on its 
> index like the sample below:
> for 1st insert:
>  
> {
>  "type":"insert",
>  "timestamp":1599004442000,
>  "binlog_filename":"mysql-bin-changelog.000039",
>  "binlog_position":1537835,
>  "database":"sample",
>  "table_name":"sample",
>  "table_id":82,
>  "columns":[
>  {
>  "id":1,
>  "name":"id",
>  "column_type":-5,
>  "value":139
>  },
>  {
>  "id":2,
>  "name":"fruit",
>  "column_type":12,
>  "value":0
>  },
>  {
>  "id":3,
>  "name":"price",
>  "column_type":12,
>  "value":45
>  }
>  ]
> }
>  
> for 2nd insert:
>  
> {
>  "type":"insert",
>  "timestamp":1599004442000,
>  "binlog_filename":"mysql-bin-changelog.000039",
>  "binlog_position":1537835,
>  "database":"sample",
>  "table_name":"sample",
>  "table_id":82,
>  "columns":[
>  {
>  "id":1,
>  "name":"id",
>  "column_type":-5,
>  "value":139
>  },
>  {
>  "id":2,
>  "name":"fruit",
>  "column_type":12,
>  "value":1
>  },
>  {
>  "id":3,
>  "name":"price",
>  "column_type":12,
>  "value":56
>  }
>  ]
> }
>  
>  
> So the above has 0 and 1 in place of apple and pears respectively.
>  
> Could you of you help me on this, if there are folks who have faced similar 
> kinda issue
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to