zeyk created NIFI-7820:
--------------------------
Summary: How to connect to controller service DBCP connection pool
and execute the sql using that connection via python
Key: NIFI-7820
URL: https://issues.apache.org/jira/browse/NIFI-7820
Project: Apache NiFi
Issue Type: Task
Components: NiFi Stateless
Reporter: zeyk
I have a python code updated with my own logic to replace the index of enums
with values, but in order to do so , i need to execute certain sql commands to
get some values and the best thing would be to get the dbcp connection pool
connection from nifi controller services and execute the commands , but i dont
know to how to implement that in python, since few modules such as pymysql
doesnt support in Nifi(Jython) ,Please find the code below:
Any kind of help would be highly appreciated. Thanks in advance
{color:#89ddff}import{color}{color:#eeffff} json{color}
{color:#89ddff}import{color}{color:#eeffff} re{color}
{color:#89ddff}import{color}{color:#eeffff} sys{color}
{color:#89ddff}import{color}{color:#eeffff} traceback{color}
{color:#89ddff}from{color}{color:#eeffff} java.nio.charset
{color}{color:#89ddff}import{color}{color:#eeffff} StandardCharsets{color}
{color:#89ddff}from{color}{color:#eeffff} org.apache.commons.io
{color}{color:#89ddff}import{color}{color:#eeffff} IOUtils{color}
{color:#89ddff}from{color}{color:#eeffff} org.apache.nifi.processor.io
{color}{color:#89ddff}import{color}{color:#eeffff} StreamCallback{color}
{color:#89ddff}from{color}{color:#eeffff} org.python.core.util
{color}{color:#89ddff}import{color}{color:#eeffff} StringUtil{color}
{color:#c792ea}class{color}{color:#eeffff}
{color}{color:#ffcb6b}TransformCallback{color}{color:#89ddff}({color}{color:#c3e88d}StreamCallback{color}{color:#89ddff}):{color}
{color:#eeffff} {color}{color:#c792ea}def{color}{color:#eeffff}
{color}{color:#82aaff}__init__{color}{color:#89ddff}({color}{color:#ff5370}self{color}{color:#89ddff}):{color}
{color:#eeffff} {color}{color:#89ddff}pass{color}
{color:#eeffff} {color}{color:#c792ea}def{color}{color:#eeffff}
{color}{color:#82aaff}process{color}{color:#89ddff}({color}{color:#ff5370}self{color}{color:#89ddff},{color}{color:#eeffff}
{color}{color:#ff5370}inputStream{color}{color:#89ddff},{color}{color:#eeffff}
{color}{color:#ff5370}outputStream{color}{color:#89ddff}):{color}
{color:#eeffff} {color}{color:#89ddff}try{color}{color:#eeffff}:{color}
{color:#89ddff} {color}{color:#546e7a}# Read input FlowFile
content{color}
{color:#eeffff} input_text
{color}{color:#c792ea}={color}{color:#eeffff}
IOUtils.toString{color}{color:#89ddff}({color}{color:#eeffff}inputStream,
StandardCharsets.UTF_8{color}{color:#89ddff}){color}
{color:#eeffff} input_obj
{color}{color:#c792ea}={color}{color:#eeffff}
json.loads{color}{color:#89ddff}({color}{color:#eeffff}input_text{color}{color:#89ddff}){color}
{color:#eeffff} {color}
{color:#eeffff} {color}
{color:#eeffff} {color}{color:#eeffff}table_name
{color}{color:#c792ea}={color}{color:#eeffff}
input_obj{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}table_name{color}{color:#89ddff}'{color}{color:#89ddff}]{color}
{color:#eeffff} column_name
{color}{color:#c792ea}={color}{color:#eeffff}
{color}{color:#89ddff}"{color}{color:#f78c6c}SELECT{color}{color:#c3e88d}
column_name {color}{color:#f78c6c}FROM{color}{color:#c3e88d}
INFORMATION_SCHEMA.COLUMNS {color}{color:#f78c6c}WHERE{color}{color:#c3e88d}
table_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+table_name+"'
{color}{color:#f78c6c}AND{color}{color:#c3e88d}
data_type{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}enum'{color}{color:#89ddff}"{color}
{color:#eeffff} enum_value_sql
{color}{color:#c792ea}={color}{color:#eeffff}
{color}{color:#89ddff}"{color}{color:#f78c6c}SELECT{color}{color:#c3e88d}
{color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}({color}{color:#82aaff}REPLACE{color}{color:#c3e88d}(column_type,{color}{color:#89ddff}'{color}{color:#c3e88d}enum',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d})',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d}(',{color}{color:#89ddff}'{color}{color:#c3e88d}'),{color}{color:#89ddff}'{color}{color:#c3e88d}\'',{color}{color:#89ddff}'{color}{color:#c3e88d}')
enums {color}{color:#f78c6c}FROM{color}{color:#c3e88d}
INFORMATION_SCHEMA.COLUMNS {color}{color:#f78c6c}WHERE{color}{color:#c3e88d}
table_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+table_name+"'
{color}{color:#f78c6c}AND{color}{color:#c3e88d}
column_name{color}{color:#c792ea}={color}{color:#89ddff}'{color}{color:#c3e88d}"+column_name+"'{color}{color:#89ddff}"{color}
{color:#eeffff} enum_value
{color}{color:#c792ea}={color}{color:#eeffff}
enum_value_sql.split{color}{color:#89ddff}({color}{color:#89ddff}'{color}{color:#c3e88d},{color}{color:#89ddff}'{color}{color:#89ddff}){color}
{color:#eeffff} {color}{color:#89ddff}for{color}{color:#eeffff} col
{color}{color:#c792ea}in{color}{color:#eeffff}
input_obj{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}columns{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff}:{color}
{color:#eeffff} {color}{color:#89ddff}if{color}{color:#eeffff}
col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}name{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff}
{color}{color:#c792ea}=={color}{color:#eeffff}
{color}{color:#b2ccd6}str{color}{color:#89ddff}({color}{color:#eeffff}column_name{color}{color:#89ddff}){color}{color:#eeffff}:{color}
{color:#eeffff}
col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}value{color}{color:#89ddff}'{color}{color:#89ddff}]{color}{color:#eeffff}
{color}{color:#c792ea}={color}{color:#eeffff}
enum_value{color}{color:#89ddff}[{color}{color:#b2ccd6}int{color}{color:#89ddff}({color}{color:#eeffff}col{color}{color:#89ddff}[{color}{color:#89ddff}'{color}{color:#c3e88d}value{color}{color:#89ddff}'{color}{color:#89ddff}]){color}{color:#eeffff}
{color}{color:#c792ea}-{color}{color:#eeffff}
{color}{color:#f78c6c}1{color}{color:#89ddff}]{color}
{color:#eeffff} output_text
{color}{color:#c792ea}={color}{color:#eeffff}
json.dumps{color}{color:#89ddff}({color}{color:#eeffff}input_obj{color}{color:#89ddff}){color}
{color:#eeffff}
outputStream.write{color}{color:#89ddff}({color}{color:#eeffff}StringUtil.toBytes{color}{color:#89ddff}({color}{color:#eeffff}output_text{color}{color:#89ddff})){color}
{color:#eeffff}
{color}{color:#89ddff}except{color}{color:#eeffff}:{color}
{color:#eeffff}
traceback.print_exc{color}{color:#89ddff}({color}{color:#ff5370}file{color}{color:#c792ea}={color}{color:#eeffff}sys.stdout{color}{color:#89ddff}){color}
{color:#eeffff} {color}{color:#89ddff}raise{color}
{color:#eeffff}flowFile {color}{color:#c792ea}={color}{color:#eeffff}
session.get{color}{color:#89ddff}(){color}
{color:#89ddff}if{color}{color:#eeffff} flowFile
{color}{color:#c792ea}!={color}{color:#eeffff}
{color}{color:#f78c6c}None{color}{color:#eeffff}:{color}
{color:#eeffff} flowFile {color}{color:#c792ea}={color}{color:#eeffff}
session.write{color}{color:#89ddff}({color}{color:#eeffff}flowFile,
TransformCallback{color}{color:#89ddff}()){color}
{color:#89ddff} {color}{color:#546e7a}# Finish by transferring the FlowFile
to an output relationship{color}
{color:#eeffff}session.transfer{color}{color:#89ddff}({color}{color:#eeffff}flowFile,
REL_SUCCESS{color}{color:#89ddff}){color}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)