Are you working in GitHub? That's a pretty easy way to share code, I
can fork your repo/branch and issue pull requests, or vice versa. If
you are working from a fork of the NiFi repo, I think you can make me
(mattyb149) a contributor to your fork/branch, and we'd go from there.
The latter is a cleaner solution IMO, since once it's done we can
squash the commits and issue the PR directly to the NiFi project for
review/merge.

If you're not on GitHub, we can figure something else out. Please feel
free to DM me at [email protected] :)

Thanks,
Matt

On Thu, Jun 9, 2016 at 10:51 AM, Shaine Berube
<[email protected]> wrote:
> @Matt
> I would enjoy collaborating on it, I'm thinking perhaps with some
> additional eyes this data flow can get even faster and get some additional
> bugs worked out.  Presently, the necessity of the situation requiring me to
> mostly hard-code the SQL queries, the data flow only works between an MS
> SQL and a MySQL database, MS SQL being the source and MySQL being the
> target, but with some small changes and perhaps an added library that can
> be changed / fixed so that this data flow will work with any database being
> in source and any database being in target.
>
> As far as collaboration, what files should I send and where?
>
> On Thu, Jun 9, 2016 at 7:58 AM, Matt Burgess <[email protected]> wrote:
>
>> Shaine,
>>
>> I was about to start work on processor(s) almost exactly like the ones
>> you are describing. Is this something you'd be interested in
>> collaborating on? I've worked on the SQL generation piece before
>> (though not for NiFi).
>>
>> Regards,
>> Matt
>>
>> On Wed, Jun 8, 2016 at 5:29 PM, Shaine Berube
>> <[email protected]> wrote:
>> > Thanks for your responses,
>> >
>> > As far as possibly contributing these back to the community, I need a
>> good
>> > way for them to connect to any database and generate that database's
>> > specific flavor of SQL, so when I get that functionality built out I
>> would
>> > be glad to.
>> >
>> > As far as the memory goes, step two sends an insert query per flow file
>> > (for migration), and each query is designed to pull out 1000 records, if
>> > that makes more sense.  But good to know that Nifi can handle a lot of
>> flow
>> > files, with the back-pressure configured it should wait for the queue
>> ahead
>> > to clear out before starting the next table.
>> > Also forgot to mention, this is interacting with two live databases, but
>> is
>> > going through my VM, so in other words, it can actually be faster if
>> placed
>> > on the machine the target database is running on.
>> >
>> > Fun facts - I'm running bench marking now, the speeds I'm seeing are
>> > because of the concurrent processing functionality of Nifi
>> > 562,000 records inserted from 1 source table into 1 target table -
>> speed: 8
>> > minutes 14 seconds (I'm being throttled by the source database).
>> > At that speed, it is approximately 1,137 records per second.
>> >
>> > Step two is running 1 thread, step three is running 60 threads, step 4 is
>> > running 30 threads.
>> >
>> > On Wed, Jun 8, 2016 at 1:23 PM, Mark Payne <[email protected]> wrote:
>> >
>> >> Shaine,
>> >>
>> >> This is a really cool set of functionality! Any chance you would be
>> >> interested in contributing
>> >> these processors back to the NiFi community?
>> >>
>> >> Regardless, one thing to consider here is that with NiFi, because of the
>> >> way that the repositories
>> >> are structured, the way that we think about heap utilization is a little
>> >> different than with most projects.
>> >> As Bryan pointed out, you will want to stream the content directly to
>> the
>> >> FlowFile, rather than buffering
>> >> in memory. The framework will handle the rest. Where we will be more
>> >> concerned about heap utilization
>> >> is actually in the number of FlowFiles that are held in memory at any
>> one
>> >> time, not the size of those FlowFiles.
>> >> So you will be better off keeping a smaller number of FlowFiles, each
>> >> having larger content. So I would
>> >> recommend making the number of records per FlowFile configurable,
>> perhaps
>> >> with a default value of
>> >> 25,000. This would also result in far fewer JDBC calls, which should be
>> >> beneficial performance-wise.
>> >> NiFi will handle swapping FlowFiles to disk when they are queued up, so
>> >> you can certainly queue up
>> >> millions of FlowFiles in a single queue without exhausting your heap
>> >> space. However, if you are buffering
>> >> up all of those FlowFiles in your processor, you may run into problems,
>> so
>> >> using a smaller number of
>> >> FlowFiles, each with many thousand records will likely provide the best
>> >> heap utilization.
>> >>
>> >> Does this help?
>> >>
>> >> Thanks
>> >> -Mark
>> >>
>> >>
>> >> > On Jun 8, 2016, at 2:05 PM, Bryan Bende <[email protected]> wrote:
>> >> >
>> >> > Thank you for the detailed explanation! It sounds like you have built
>> >> > something very cool here.
>> >> >
>> >> > I'm still digesting the different steps and thinking of what can be
>> done,
>> >> > but something that initially jumped out at me was
>> >> > when you mentioned considering how much memory NiFi has and not
>> wanting
>> >> to
>> >> > go over 1000 records per chunk...
>> >> >
>> >> > You should be able to read and write the chunks in a streaming fashion
>> >> and
>> >> > never have the entire chunk in memory. For example,
>> >> > when creating the chunk you would be looping over a ResultSet from the
>> >> > database and writing each record to the OutputStream of the
>> >> > FlowFile, never having all 1000 records in memory. On the down stream
>> >> > processor you would read the record from the  InputStream of the
>> >> > FlowFile, sending each one to the destination database, again not
>> having
>> >> > all 1000 records in memory. If you can operate like this then having
>> >> > 1000 records per chunk, or 100,000 records per chunk, shouldn't change
>> >> the
>> >> > memory requirement for NiFi.
>> >> >
>> >> > An example of what we do for ExecuteSQL and QueryDatabaseTable is in
>> the
>> >> > JdbcCommon util where it converts the ResultSet to Avro records by
>> >> writing
>> >> > to the OutputStream:
>> >> >
>> >>
>> https://github.com/apache/nifi/blob/e4b7e47836edf47042973e604005058c28eed23b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java#L80
>> >> >
>> >> > Another point is that it is not necessarily a bad thing to have say
>> >> 10,000
>> >> > Flow Files in a queue. The queue is not actually holding the content
>> of
>> >> > those FlowFiles, it is only holding pointers to where the content is,
>> and
>> >> > only when
>> >> > the next processor does a session.read(flowFile, ...) does it then
>> read
>> >> in
>> >> > the content as a stream. In general NiFi should be able to handle 10s
>> of
>> >> > thousands, or even 100s of thousands of Flow Files sitting in a queue.
>> >> >
>> >> > With your current approach have you seen a specific issue, such as
>> out of
>> >> > memory exceptions? or were you just concerned by the number of flow
>> files
>> >> > in the queue continuing to grow?
>> >> >
>> >> > I'll continue to think about this more, and maybe someone else on the
>> >> list
>> >> > has additional idea/thoughts.
>> >> >
>> >> > -Bryan
>> >> >
>> >> >
>> >> >
>> >> > On Wed, Jun 8, 2016 at 12:29 PM, Shaine Berube <
>> >> > [email protected]> wrote:
>> >> >
>> >> >> Perhaps I need to explain a little more about the data flow as a
>> whole.
>> >> >> But yes, Processor A is a custom built processor.
>> >> >>
>> >> >> In explanation:
>> >> >> The data flow that I've built out is basically a 4 to 6 step process
>> (6
>> >> >> steps because the first and last processors are optional).  In the
>> four
>> >> >> step process, step one gathers information from the source and target
>> >> >> databases in preparation for the migration of data from source to
>> >> target,
>> >> >> this includes table names, primary keys, and record counts, step one
>> >> then
>> >> >> produces a flow file per table which in this case is 24 flow files.
>> >> >>
>> >> >> Step two of the process would be the equivalent of processor A, in
>> step
>> >> two
>> >> >> I'm taking in a flow file and generating the SQL queries that are
>> going
>> >> to
>> >> >> be run.  The reason the back pressure doesn't work therefore is
>> because
>> >> the
>> >> >> processor is working on one file, which corresponds to a table, which
>> >> said
>> >> >> table will be split into 1000 record chunks with the SQL query
>> >> splitting.
>> >> >> A fair few of these tables however, are over 10 million records,
>> which
>> >> >> means that on a single execution, this processor will generate over
>> >> 10,000
>> >> >> flow files (1000 record chunks).  As far as it goes, I cannot save
>> this
>> >> >> information directly to the VM or server that I'm running the data
>> flow
>> >> on,
>> >> >> because the information can contain extremely sensitive and secure
>> data.
>> >> >> That being said, I need to consider how much memory the Nifi process
>> >> has to
>> >> >> run, so I don't want to go over 1000 records in a chunk.
>> >> >>
>> >> >> Step three of the process takes each individual flow file from the
>> >> queue,
>> >> >> pulls the SQL query out of the flow file contents, runs it against
>> >> source,
>> >> >> and then puts the results in either a CSV or an XML format into the
>> >> >> contents of a flow file and sends it to the next queue.
>> >> >>
>> >> >> Step four of the process takes the results out of the flow file
>> >> contents,
>> >> >> sticks them into an SQL query and runs it against target.
>> >> >>
>> >> >> Keep in mind: this data flow has been built to handle migration, but
>> >> also
>> >> >> is attempting to keep up to date (incrementor/listener), with the
>> source
>> >> >> database.  Given that we don't have full access to the source
>> database,
>> >> I'm
>> >> >> basically limited to running select queries against it and gathering
>> the
>> >> >> information I need to put into target.  But this data flow is
>> >> configured to
>> >> >> handle INSERT and UPDATE SQL queries, with DELETE queries coming some
>> >> time
>> >> >> in the future.  The data flow is configured so that step one can
>> either
>> >> be
>> >> >> the migrator (full data dump), or the incrementor (incremental data
>> >> dump,
>> >> >> use incrementor after migrator has been run).
>> >> >>
>> >> >> Now, the six step process adds a step before step one that allows
>> step
>> >> one
>> >> >> to be multi-threaded, and it adds a step after step four that runs
>> the
>> >> >> queries (basically step four turns into the step that generates
>> queries
>> >> for
>> >> >> the next step to run).  If you want more information on the six step
>> >> >> process I can give it, but it's not necessary to know for the
>> question
>> >> I'm
>> >> >> asking.
>> >> >>
>> >> >> So now knowing more about the whole data flow model, is there a way
>> for
>> >> me
>> >> >> to do some sort of notification system on step two so that we can
>> keep
>> >> the
>> >> >> queue limited to 2000 between step two and step three?
>> >> >> Keep in mind, each step in the four step process is another
>> processor.
>> >> So
>> >> >> there are four processors involved in the data flow that I'm working
>> >> with,
>> >> >> all four have been custom developed by yours truly, they work
>> directly
>> >> with
>> >> >> the databases and run SQL queries.
>> >> >>
>> >> >> On Wed, Jun 8, 2016 at 9:31 AM, Bryan Bende <[email protected]>
>> wrote:
>> >> >>
>> >> >>> Ok I didn't realize you had already tried setting the back-pressure
>> >> >>> settings. Can you described the processors a little more, are they
>> >> custom
>> >> >>> processors?
>> >> >>>
>> >> >>> I am guessing that ProcessorA is producing all 5k flow files from a
>> >> >> single
>> >> >>> execution of onTrigger, which would explain why back-pressure didn't
>> >> >> solve
>> >> >>> the problem, because
>> >> >>> back-pressure would stop the processor from executing again, but its
>> >> >>> already too late because the first execution already went over the
>> >> limit.
>> >> >>>
>> >> >>> Without knowing too much about what ProcessorA is doing, I'm
>> wondering
>> >> >> if
>> >> >>> there is a way to put some indirection between the two processors.
>> What
>> >> >> if
>> >> >>> ProcessorA sent its
>> >> >>> output to a PutFile processor that wrote all the chunks out to a
>> >> >> directory,
>> >> >>> then there was a separate GetFile processor that was concurrently
>> >> picking
>> >> >>> up the chunks from that
>> >> >>> directory and sending to ProcessorB?
>> >> >>>
>> >> >>> Then the back-pressure between GetFile and ProcessorB would work
>> >> because
>> >> >>> once the queue reached 2000, GetFile wouldn't pick up anymore files.
>> >> The
>> >> >>> downside is you
>> >> >>> would need enough disk-space on your NiFi node to possibly store
>> your
>> >> >> whole
>> >> >>> database table, which may not be an option.
>> >> >>>
>> >> >>> Another idea might be to have two levels of chunks, for example with
>> >> the
>> >> >>> SplitText processor if we want to split a file with 1 million lines
>> in
>> >> >> it,
>> >> >>> rather than do one split producing
>> >> >>> 1 million flow files, we usually do a split to 10k chunks, then
>> another
>> >> >>> split down to 1 line. Maybe ProcessorA could produce much large
>> chunks,
>> >> >> say
>> >> >>> 10k or 100k records each,
>> >> >>> then the next processor further splits those before going to
>> >> ProcessorB.
>> >> >>> This would also allow back-pressure to work a little better the
>> second
>> >> >>> split processor and ProcessorB.
>> >> >>>
>> >> >>> If anyone else has ideas here, feel free to chime in.
>> >> >>>
>> >> >>> Thanks,
>> >> >>>
>> >> >>> Bryan
>> >> >>>
>> >> >>> On Wed, Jun 8, 2016 at 10:51 AM, Shaine Berube <
>> >> >>> [email protected]> wrote:
>> >> >>>
>> >> >>>> I do need more information, because I tried using that option, but
>> the
>> >> >>>> processor just continued filling the queue anyway, I told it to
>> only
>> >> >>> allow
>> >> >>>> 2000 before back pressure kicks in, but it kept going and I ended
>> up
>> >> >> with
>> >> >>>> 5k files in the queue before I restarted Nifi to get the processor
>> to
>> >> >>> stop.
>> >> >>>>
>> >> >>>> On Wed, Jun 8, 2016 at 8:45 AM, Bryan Bende <[email protected]>
>> wrote:
>> >> >>>>
>> >> >>>>> Hello,
>> >> >>>>>
>> >> >>>>> Take a look at the options available when right-clicking on a
>> >> >> queue...
>> >> >>>>> What you described is what NiFi calls back-pressure. You can
>> >> >>> configured a
>> >> >>>>> queue to have an object threshold (# of flow files) or data size
>> >> >>>> threshold
>> >> >>>>> (total size of all flow files).
>> >> >>>>> When one of these thresholds is reached, NiFi will no longer let
>> the
>> >> >>>> source
>> >> >>>>> processor run until the condition goes back under the threshold.
>> >> >>>>>
>> >> >>>>> Let us know if you need any more info on this.
>> >> >>>>>
>> >> >>>>> Thanks,
>> >> >>>>>
>> >> >>>>> Bryan
>> >> >>>>>
>> >> >>>>> On Wed, Jun 8, 2016 at 10:40 AM, Shaine Berube <
>> >> >>>>> [email protected]> wrote:
>> >> >>>>>
>> >> >>>>>> Hello all,
>> >> >>>>>>
>> >> >>>>>> I'm kind of new to developing Nifi, though I've been doing some
>> >> >>> pretty
>> >> >>>> in
>> >> >>>>>> depth stuff and some advanced database queries.  My question is
>> in
>> >> >>>>>> regarding the queues between processor, I want to limit a queue
>> >> >> to...
>> >> >>>> say
>> >> >>>>>> 2000, how would I go about doing that?  Or better yet, how would
>> I
>> >> >>> tell
>> >> >>>>> the
>> >> >>>>>> processor generating the queue to only put a max of 2000 files
>> into
>> >> >>> the
>> >> >>>>>> queue?
>> >> >>>>>>
>> >> >>>>>> Allow me to explain with a scenario:
>> >> >>>>>> We are doing data migration from one database to another.
>> >> >>>>>> -Processor A is generating a queue consumed by Processor B
>> >> >>>>>> -Processor A is taking configuration and generating SQL queries
>> in
>> >> >>> 1000
>> >> >>>>>> record chunks so that Processor B can insert them into a new
>> >> >>> database.
>> >> >>>>>> Given the size of the source database, Processor A can
>> potentially
>> >> >>>>> generate
>> >> >>>>>> hundreds of thousands of files.
>> >> >>>>>>
>> >> >>>>>> Is there a way for Processor A to check it's down stream queue
>> for
>> >> >>> the
>> >> >>>>>> queue size?  How would I get Processor A to only put 2000 files
>> >> >> into
>> >> >>>> the
>> >> >>>>>> queue at any given time, so that Processor A can continue running
>> >> >> but
>> >> >>>>> wait
>> >> >>>>>> for room in the queue?
>> >> >>>>>>
>> >> >>>>>> Thank you in advance.
>> >> >>>>>>
>> >> >>>>>> --
>> >> >>>>>> *Shaine Berube*
>> >> >>>>>>
>> >> >>>>>
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>> --
>> >> >>>> *Shaine Berube*
>> >> >>>>
>> >> >>>
>> >> >>
>> >> >>
>> >> >>
>> >> >> --
>> >> >> *Shaine Berube*
>> >> >>
>> >>
>> >>
>> >
>> >
>> > --
>> > *Shaine Berube*
>>
>
>
>
> --
> *Shaine Berube*

Reply via email to