I am on GitHub, but not with this project, plus there is some proprietary
code contained within the folder I'm developing in, so I'll create a zip
file of the java files that this contains.  You should be able to gather
and link in the libraries yourself, mostly they're pretty standard.

On Thu, Jun 9, 2016 at 9:09 AM, Matt Burgess <[email protected]> wrote:

> Are you working in GitHub? That's a pretty easy way to share code, I
> can fork your repo/branch and issue pull requests, or vice versa. If
> you are working from a fork of the NiFi repo, I think you can make me
> (mattyb149) a contributor to your fork/branch, and we'd go from there.
> The latter is a cleaner solution IMO, since once it's done we can
> squash the commits and issue the PR directly to the NiFi project for
> review/merge.
>
> If you're not on GitHub, we can figure something else out. Please feel
> free to DM me at [email protected] :)
>
> Thanks,
> Matt
>
> On Thu, Jun 9, 2016 at 10:51 AM, Shaine Berube
> <[email protected]> wrote:
> > @Matt
> > I would enjoy collaborating on it, I'm thinking perhaps with some
> > additional eyes this data flow can get even faster and get some
> additional
> > bugs worked out.  Presently, the necessity of the situation requiring me
> to
> > mostly hard-code the SQL queries, the data flow only works between an MS
> > SQL and a MySQL database, MS SQL being the source and MySQL being the
> > target, but with some small changes and perhaps an added library that can
> > be changed / fixed so that this data flow will work with any database
> being
> > in source and any database being in target.
> >
> > As far as collaboration, what files should I send and where?
> >
> > On Thu, Jun 9, 2016 at 7:58 AM, Matt Burgess <[email protected]>
> wrote:
> >
> >> Shaine,
> >>
> >> I was about to start work on processor(s) almost exactly like the ones
> >> you are describing. Is this something you'd be interested in
> >> collaborating on? I've worked on the SQL generation piece before
> >> (though not for NiFi).
> >>
> >> Regards,
> >> Matt
> >>
> >> On Wed, Jun 8, 2016 at 5:29 PM, Shaine Berube
> >> <[email protected]> wrote:
> >> > Thanks for your responses,
> >> >
> >> > As far as possibly contributing these back to the community, I need a
> >> good
> >> > way for them to connect to any database and generate that database's
> >> > specific flavor of SQL, so when I get that functionality built out I
> >> would
> >> > be glad to.
> >> >
> >> > As far as the memory goes, step two sends an insert query per flow
> file
> >> > (for migration), and each query is designed to pull out 1000 records,
> if
> >> > that makes more sense.  But good to know that Nifi can handle a lot of
> >> flow
> >> > files, with the back-pressure configured it should wait for the queue
> >> ahead
> >> > to clear out before starting the next table.
> >> > Also forgot to mention, this is interacting with two live databases,
> but
> >> is
> >> > going through my VM, so in other words, it can actually be faster if
> >> placed
> >> > on the machine the target database is running on.
> >> >
> >> > Fun facts - I'm running bench marking now, the speeds I'm seeing are
> >> > because of the concurrent processing functionality of Nifi
> >> > 562,000 records inserted from 1 source table into 1 target table -
> >> speed: 8
> >> > minutes 14 seconds (I'm being throttled by the source database).
> >> > At that speed, it is approximately 1,137 records per second.
> >> >
> >> > Step two is running 1 thread, step three is running 60 threads, step
> 4 is
> >> > running 30 threads.
> >> >
> >> > On Wed, Jun 8, 2016 at 1:23 PM, Mark Payne <[email protected]>
> wrote:
> >> >
> >> >> Shaine,
> >> >>
> >> >> This is a really cool set of functionality! Any chance you would be
> >> >> interested in contributing
> >> >> these processors back to the NiFi community?
> >> >>
> >> >> Regardless, one thing to consider here is that with NiFi, because of
> the
> >> >> way that the repositories
> >> >> are structured, the way that we think about heap utilization is a
> little
> >> >> different than with most projects.
> >> >> As Bryan pointed out, you will want to stream the content directly to
> >> the
> >> >> FlowFile, rather than buffering
> >> >> in memory. The framework will handle the rest. Where we will be more
> >> >> concerned about heap utilization
> >> >> is actually in the number of FlowFiles that are held in memory at any
> >> one
> >> >> time, not the size of those FlowFiles.
> >> >> So you will be better off keeping a smaller number of FlowFiles, each
> >> >> having larger content. So I would
> >> >> recommend making the number of records per FlowFile configurable,
> >> perhaps
> >> >> with a default value of
> >> >> 25,000. This would also result in far fewer JDBC calls, which should
> be
> >> >> beneficial performance-wise.
> >> >> NiFi will handle swapping FlowFiles to disk when they are queued up,
> so
> >> >> you can certainly queue up
> >> >> millions of FlowFiles in a single queue without exhausting your heap
> >> >> space. However, if you are buffering
> >> >> up all of those FlowFiles in your processor, you may run into
> problems,
> >> so
> >> >> using a smaller number of
> >> >> FlowFiles, each with many thousand records will likely provide the
> best
> >> >> heap utilization.
> >> >>
> >> >> Does this help?
> >> >>
> >> >> Thanks
> >> >> -Mark
> >> >>
> >> >>
> >> >> > On Jun 8, 2016, at 2:05 PM, Bryan Bende <[email protected]> wrote:
> >> >> >
> >> >> > Thank you for the detailed explanation! It sounds like you have
> built
> >> >> > something very cool here.
> >> >> >
> >> >> > I'm still digesting the different steps and thinking of what can be
> >> done,
> >> >> > but something that initially jumped out at me was
> >> >> > when you mentioned considering how much memory NiFi has and not
> >> wanting
> >> >> to
> >> >> > go over 1000 records per chunk...
> >> >> >
> >> >> > You should be able to read and write the chunks in a streaming
> fashion
> >> >> and
> >> >> > never have the entire chunk in memory. For example,
> >> >> > when creating the chunk you would be looping over a ResultSet from
> the
> >> >> > database and writing each record to the OutputStream of the
> >> >> > FlowFile, never having all 1000 records in memory. On the down
> stream
> >> >> > processor you would read the record from the  InputStream of the
> >> >> > FlowFile, sending each one to the destination database, again not
> >> having
> >> >> > all 1000 records in memory. If you can operate like this then
> having
> >> >> > 1000 records per chunk, or 100,000 records per chunk, shouldn't
> change
> >> >> the
> >> >> > memory requirement for NiFi.
> >> >> >
> >> >> > An example of what we do for ExecuteSQL and QueryDatabaseTable is
> in
> >> the
> >> >> > JdbcCommon util where it converts the ResultSet to Avro records by
> >> >> writing
> >> >> > to the OutputStream:
> >> >> >
> >> >>
> >>
> https://github.com/apache/nifi/blob/e4b7e47836edf47042973e604005058c28eed23b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java#L80
> >> >> >
> >> >> > Another point is that it is not necessarily a bad thing to have say
> >> >> 10,000
> >> >> > Flow Files in a queue. The queue is not actually holding the
> content
> >> of
> >> >> > those FlowFiles, it is only holding pointers to where the content
> is,
> >> and
> >> >> > only when
> >> >> > the next processor does a session.read(flowFile, ...) does it then
> >> read
> >> >> in
> >> >> > the content as a stream. In general NiFi should be able to handle
> 10s
> >> of
> >> >> > thousands, or even 100s of thousands of Flow Files sitting in a
> queue.
> >> >> >
> >> >> > With your current approach have you seen a specific issue, such as
> >> out of
> >> >> > memory exceptions? or were you just concerned by the number of flow
> >> files
> >> >> > in the queue continuing to grow?
> >> >> >
> >> >> > I'll continue to think about this more, and maybe someone else on
> the
> >> >> list
> >> >> > has additional idea/thoughts.
> >> >> >
> >> >> > -Bryan
> >> >> >
> >> >> >
> >> >> >
> >> >> > On Wed, Jun 8, 2016 at 12:29 PM, Shaine Berube <
> >> >> > [email protected]> wrote:
> >> >> >
> >> >> >> Perhaps I need to explain a little more about the data flow as a
> >> whole.
> >> >> >> But yes, Processor A is a custom built processor.
> >> >> >>
> >> >> >> In explanation:
> >> >> >> The data flow that I've built out is basically a 4 to 6 step
> process
> >> (6
> >> >> >> steps because the first and last processors are optional).  In the
> >> four
> >> >> >> step process, step one gathers information from the source and
> target
> >> >> >> databases in preparation for the migration of data from source to
> >> >> target,
> >> >> >> this includes table names, primary keys, and record counts, step
> one
> >> >> then
> >> >> >> produces a flow file per table which in this case is 24 flow
> files.
> >> >> >>
> >> >> >> Step two of the process would be the equivalent of processor A, in
> >> step
> >> >> two
> >> >> >> I'm taking in a flow file and generating the SQL queries that are
> >> going
> >> >> to
> >> >> >> be run.  The reason the back pressure doesn't work therefore is
> >> because
> >> >> the
> >> >> >> processor is working on one file, which corresponds to a table,
> which
> >> >> said
> >> >> >> table will be split into 1000 record chunks with the SQL query
> >> >> splitting.
> >> >> >> A fair few of these tables however, are over 10 million records,
> >> which
> >> >> >> means that on a single execution, this processor will generate
> over
> >> >> 10,000
> >> >> >> flow files (1000 record chunks).  As far as it goes, I cannot save
> >> this
> >> >> >> information directly to the VM or server that I'm running the data
> >> flow
> >> >> on,
> >> >> >> because the information can contain extremely sensitive and secure
> >> data.
> >> >> >> That being said, I need to consider how much memory the Nifi
> process
> >> >> has to
> >> >> >> run, so I don't want to go over 1000 records in a chunk.
> >> >> >>
> >> >> >> Step three of the process takes each individual flow file from the
> >> >> queue,
> >> >> >> pulls the SQL query out of the flow file contents, runs it against
> >> >> source,
> >> >> >> and then puts the results in either a CSV or an XML format into
> the
> >> >> >> contents of a flow file and sends it to the next queue.
> >> >> >>
> >> >> >> Step four of the process takes the results out of the flow file
> >> >> contents,
> >> >> >> sticks them into an SQL query and runs it against target.
> >> >> >>
> >> >> >> Keep in mind: this data flow has been built to handle migration,
> but
> >> >> also
> >> >> >> is attempting to keep up to date (incrementor/listener), with the
> >> source
> >> >> >> database.  Given that we don't have full access to the source
> >> database,
> >> >> I'm
> >> >> >> basically limited to running select queries against it and
> gathering
> >> the
> >> >> >> information I need to put into target.  But this data flow is
> >> >> configured to
> >> >> >> handle INSERT and UPDATE SQL queries, with DELETE queries coming
> some
> >> >> time
> >> >> >> in the future.  The data flow is configured so that step one can
> >> either
> >> >> be
> >> >> >> the migrator (full data dump), or the incrementor (incremental
> data
> >> >> dump,
> >> >> >> use incrementor after migrator has been run).
> >> >> >>
> >> >> >> Now, the six step process adds a step before step one that allows
> >> step
> >> >> one
> >> >> >> to be multi-threaded, and it adds a step after step four that runs
> >> the
> >> >> >> queries (basically step four turns into the step that generates
> >> queries
> >> >> for
> >> >> >> the next step to run).  If you want more information on the six
> step
> >> >> >> process I can give it, but it's not necessary to know for the
> >> question
> >> >> I'm
> >> >> >> asking.
> >> >> >>
> >> >> >> So now knowing more about the whole data flow model, is there a
> way
> >> for
> >> >> me
> >> >> >> to do some sort of notification system on step two so that we can
> >> keep
> >> >> the
> >> >> >> queue limited to 2000 between step two and step three?
> >> >> >> Keep in mind, each step in the four step process is another
> >> processor.
> >> >> So
> >> >> >> there are four processors involved in the data flow that I'm
> working
> >> >> with,
> >> >> >> all four have been custom developed by yours truly, they work
> >> directly
> >> >> with
> >> >> >> the databases and run SQL queries.
> >> >> >>
> >> >> >> On Wed, Jun 8, 2016 at 9:31 AM, Bryan Bende <[email protected]>
> >> wrote:
> >> >> >>
> >> >> >>> Ok I didn't realize you had already tried setting the
> back-pressure
> >> >> >>> settings. Can you described the processors a little more, are
> they
> >> >> custom
> >> >> >>> processors?
> >> >> >>>
> >> >> >>> I am guessing that ProcessorA is producing all 5k flow files
> from a
> >> >> >> single
> >> >> >>> execution of onTrigger, which would explain why back-pressure
> didn't
> >> >> >> solve
> >> >> >>> the problem, because
> >> >> >>> back-pressure would stop the processor from executing again, but
> its
> >> >> >>> already too late because the first execution already went over
> the
> >> >> limit.
> >> >> >>>
> >> >> >>> Without knowing too much about what ProcessorA is doing, I'm
> >> wondering
> >> >> >> if
> >> >> >>> there is a way to put some indirection between the two
> processors.
> >> What
> >> >> >> if
> >> >> >>> ProcessorA sent its
> >> >> >>> output to a PutFile processor that wrote all the chunks out to a
> >> >> >> directory,
> >> >> >>> then there was a separate GetFile processor that was concurrently
> >> >> picking
> >> >> >>> up the chunks from that
> >> >> >>> directory and sending to ProcessorB?
> >> >> >>>
> >> >> >>> Then the back-pressure between GetFile and ProcessorB would work
> >> >> because
> >> >> >>> once the queue reached 2000, GetFile wouldn't pick up anymore
> files.
> >> >> The
> >> >> >>> downside is you
> >> >> >>> would need enough disk-space on your NiFi node to possibly store
> >> your
> >> >> >> whole
> >> >> >>> database table, which may not be an option.
> >> >> >>>
> >> >> >>> Another idea might be to have two levels of chunks, for example
> with
> >> >> the
> >> >> >>> SplitText processor if we want to split a file with 1 million
> lines
> >> in
> >> >> >> it,
> >> >> >>> rather than do one split producing
> >> >> >>> 1 million flow files, we usually do a split to 10k chunks, then
> >> another
> >> >> >>> split down to 1 line. Maybe ProcessorA could produce much large
> >> chunks,
> >> >> >> say
> >> >> >>> 10k or 100k records each,
> >> >> >>> then the next processor further splits those before going to
> >> >> ProcessorB.
> >> >> >>> This would also allow back-pressure to work a little better the
> >> second
> >> >> >>> split processor and ProcessorB.
> >> >> >>>
> >> >> >>> If anyone else has ideas here, feel free to chime in.
> >> >> >>>
> >> >> >>> Thanks,
> >> >> >>>
> >> >> >>> Bryan
> >> >> >>>
> >> >> >>> On Wed, Jun 8, 2016 at 10:51 AM, Shaine Berube <
> >> >> >>> [email protected]> wrote:
> >> >> >>>
> >> >> >>>> I do need more information, because I tried using that option,
> but
> >> the
> >> >> >>>> processor just continued filling the queue anyway, I told it to
> >> only
> >> >> >>> allow
> >> >> >>>> 2000 before back pressure kicks in, but it kept going and I
> ended
> >> up
> >> >> >> with
> >> >> >>>> 5k files in the queue before I restarted Nifi to get the
> processor
> >> to
> >> >> >>> stop.
> >> >> >>>>
> >> >> >>>> On Wed, Jun 8, 2016 at 8:45 AM, Bryan Bende <[email protected]>
> >> wrote:
> >> >> >>>>
> >> >> >>>>> Hello,
> >> >> >>>>>
> >> >> >>>>> Take a look at the options available when right-clicking on a
> >> >> >> queue...
> >> >> >>>>> What you described is what NiFi calls back-pressure. You can
> >> >> >>> configured a
> >> >> >>>>> queue to have an object threshold (# of flow files) or data
> size
> >> >> >>>> threshold
> >> >> >>>>> (total size of all flow files).
> >> >> >>>>> When one of these thresholds is reached, NiFi will no longer
> let
> >> the
> >> >> >>>> source
> >> >> >>>>> processor run until the condition goes back under the
> threshold.
> >> >> >>>>>
> >> >> >>>>> Let us know if you need any more info on this.
> >> >> >>>>>
> >> >> >>>>> Thanks,
> >> >> >>>>>
> >> >> >>>>> Bryan
> >> >> >>>>>
> >> >> >>>>> On Wed, Jun 8, 2016 at 10:40 AM, Shaine Berube <
> >> >> >>>>> [email protected]> wrote:
> >> >> >>>>>
> >> >> >>>>>> Hello all,
> >> >> >>>>>>
> >> >> >>>>>> I'm kind of new to developing Nifi, though I've been doing
> some
> >> >> >>> pretty
> >> >> >>>> in
> >> >> >>>>>> depth stuff and some advanced database queries.  My question
> is
> >> in
> >> >> >>>>>> regarding the queues between processor, I want to limit a
> queue
> >> >> >> to...
> >> >> >>>> say
> >> >> >>>>>> 2000, how would I go about doing that?  Or better yet, how
> would
> >> I
> >> >> >>> tell
> >> >> >>>>> the
> >> >> >>>>>> processor generating the queue to only put a max of 2000 files
> >> into
> >> >> >>> the
> >> >> >>>>>> queue?
> >> >> >>>>>>
> >> >> >>>>>> Allow me to explain with a scenario:
> >> >> >>>>>> We are doing data migration from one database to another.
> >> >> >>>>>> -Processor A is generating a queue consumed by Processor B
> >> >> >>>>>> -Processor A is taking configuration and generating SQL
> queries
> >> in
> >> >> >>> 1000
> >> >> >>>>>> record chunks so that Processor B can insert them into a new
> >> >> >>> database.
> >> >> >>>>>> Given the size of the source database, Processor A can
> >> potentially
> >> >> >>>>> generate
> >> >> >>>>>> hundreds of thousands of files.
> >> >> >>>>>>
> >> >> >>>>>> Is there a way for Processor A to check it's down stream queue
> >> for
> >> >> >>> the
> >> >> >>>>>> queue size?  How would I get Processor A to only put 2000
> files
> >> >> >> into
> >> >> >>>> the
> >> >> >>>>>> queue at any given time, so that Processor A can continue
> running
> >> >> >> but
> >> >> >>>>> wait
> >> >> >>>>>> for room in the queue?
> >> >> >>>>>>
> >> >> >>>>>> Thank you in advance.
> >> >> >>>>>>
> >> >> >>>>>> --
> >> >> >>>>>> *Shaine Berube*
> >> >> >>>>>>
> >> >> >>>>>
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> >> >> >>>> --
> >> >> >>>> *Shaine Berube*
> >> >> >>>>
> >> >> >>>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> --
> >> >> >> *Shaine Berube*
> >> >> >>
> >> >>
> >> >>
> >> >
> >> >
> >> > --
> >> > *Shaine Berube*
> >>
> >
> >
> >
> > --
> > *Shaine Berube*
>



-- 
*Shaine Berube*

Reply via email to