Hi,
I have some fundamental questions on a custom Processor I am working on.
First of all, I think I am trying to do too many things in one custom
processor (correct me if I am wrong :) as 1) I am sending a Post HTTP
request 2) Posting the JSON response to Kafka. In my custom processor I have
all the required properties and two methods one that prepares and send the
HTTP request and another that post the result to Kafka. My OnScheduled
checks if the post Url connection and the kafka one can be established. It
looks something like this: 

String http_post_url = context.getProperty(HTTP_POST_URL).getValue();
String kafka_url = context.getProperty(KAFKA_URL).getValue();

try {
            HttpURLConnection httpPostUrlConnection = (HttpURLConnection)
httpPostUrl.openConnection();
        } catch (IOException ex) {
            ex.printStackTrace();
        }

        URL kafkaUrl = new URL(kafka_url);

        try {
            HttpURLConnection kafkaUrlConnection = (HttpURLConnection)
kafkaUrl.openConnection();
        } catch (IOException ex) {
            ex.printStackTrace();
        }

Now, all my doubts are in the onTrigger method as I am still struggling to
understand what really should be happening there. I guess that what I want
is to call the method that constructs the HTTP post request and then do the
same thing with the posting to kafka. Am I doing too many things in my
onTrigger? My postToKafka will look something like this (I am using
AtomicReference now):

final AtomicReference<String> postToKafkaHolder = new AtomicReference<>();

        session.read(flowFile, new InputStreamCallback() {

            @Override
            public void process(InputStream in) throws IOException {

                StringWriter strWriter = new StringWriter();
                IOUtils.copy(in, strWriter, "UTF-8");
                String contents = strWriter.toString();
                postToKafkaHolder.set(contents);
            }

        });

        try {
            postToKafka(postToKafkaHolder.get());
            session.transfer(flowFile, SUCCESS);
        } catch (IOException e) {
            session.transfer(flowFile, FAILURE);
        }

Similarly, this should be done the postHTTPRequest, would that be correct in
the same onTrigger? I apologise for such a newbie question, but I am still
confused of how work with the concept of flowfile.

Thank you so much for your help!





--
View this message in context: 
http://apache-nifi-developer-list.39713.n7.nabble.com/Is-my-custom-processor-doing-too-many-things-OnTrigger-question-tp9225.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Reply via email to