Are you working in GitHub? That's a pretty easy way to share code, I can fork your repo/branch and issue pull requests, or vice versa. If you are working from a fork of the NiFi repo, I think you can make me (mattyb149) a contributor to your fork/branch, and we'd go from there. The latter is a cleaner solution IMO, since once it's done we can squash the commits and issue the PR directly to the NiFi project for review/merge.
If you're not on GitHub, we can figure something else out. Please feel free to DM me at [email protected] :) Thanks, Matt On Thu, Jun 9, 2016 at 10:51 AM, Shaine Berube <[email protected]> wrote: > @Matt > I would enjoy collaborating on it, I'm thinking perhaps with some > additional eyes this data flow can get even faster and get some additional > bugs worked out. Presently, the necessity of the situation requiring me to > mostly hard-code the SQL queries, the data flow only works between an MS > SQL and a MySQL database, MS SQL being the source and MySQL being the > target, but with some small changes and perhaps an added library that can > be changed / fixed so that this data flow will work with any database being > in source and any database being in target. > > As far as collaboration, what files should I send and where? > > On Thu, Jun 9, 2016 at 7:58 AM, Matt Burgess <[email protected]> wrote: > >> Shaine, >> >> I was about to start work on processor(s) almost exactly like the ones >> you are describing. Is this something you'd be interested in >> collaborating on? I've worked on the SQL generation piece before >> (though not for NiFi). >> >> Regards, >> Matt >> >> On Wed, Jun 8, 2016 at 5:29 PM, Shaine Berube >> <[email protected]> wrote: >> > Thanks for your responses, >> > >> > As far as possibly contributing these back to the community, I need a >> good >> > way for them to connect to any database and generate that database's >> > specific flavor of SQL, so when I get that functionality built out I >> would >> > be glad to. >> > >> > As far as the memory goes, step two sends an insert query per flow file >> > (for migration), and each query is designed to pull out 1000 records, if >> > that makes more sense. But good to know that Nifi can handle a lot of >> flow >> > files, with the back-pressure configured it should wait for the queue >> ahead >> > to clear out before starting the next table. >> > Also forgot to mention, this is interacting with two live databases, but >> is >> > going through my VM, so in other words, it can actually be faster if >> placed >> > on the machine the target database is running on. >> > >> > Fun facts - I'm running bench marking now, the speeds I'm seeing are >> > because of the concurrent processing functionality of Nifi >> > 562,000 records inserted from 1 source table into 1 target table - >> speed: 8 >> > minutes 14 seconds (I'm being throttled by the source database). >> > At that speed, it is approximately 1,137 records per second. >> > >> > Step two is running 1 thread, step three is running 60 threads, step 4 is >> > running 30 threads. >> > >> > On Wed, Jun 8, 2016 at 1:23 PM, Mark Payne <[email protected]> wrote: >> > >> >> Shaine, >> >> >> >> This is a really cool set of functionality! Any chance you would be >> >> interested in contributing >> >> these processors back to the NiFi community? >> >> >> >> Regardless, one thing to consider here is that with NiFi, because of the >> >> way that the repositories >> >> are structured, the way that we think about heap utilization is a little >> >> different than with most projects. >> >> As Bryan pointed out, you will want to stream the content directly to >> the >> >> FlowFile, rather than buffering >> >> in memory. The framework will handle the rest. Where we will be more >> >> concerned about heap utilization >> >> is actually in the number of FlowFiles that are held in memory at any >> one >> >> time, not the size of those FlowFiles. >> >> So you will be better off keeping a smaller number of FlowFiles, each >> >> having larger content. So I would >> >> recommend making the number of records per FlowFile configurable, >> perhaps >> >> with a default value of >> >> 25,000. This would also result in far fewer JDBC calls, which should be >> >> beneficial performance-wise. >> >> NiFi will handle swapping FlowFiles to disk when they are queued up, so >> >> you can certainly queue up >> >> millions of FlowFiles in a single queue without exhausting your heap >> >> space. However, if you are buffering >> >> up all of those FlowFiles in your processor, you may run into problems, >> so >> >> using a smaller number of >> >> FlowFiles, each with many thousand records will likely provide the best >> >> heap utilization. >> >> >> >> Does this help? >> >> >> >> Thanks >> >> -Mark >> >> >> >> >> >> > On Jun 8, 2016, at 2:05 PM, Bryan Bende <[email protected]> wrote: >> >> > >> >> > Thank you for the detailed explanation! It sounds like you have built >> >> > something very cool here. >> >> > >> >> > I'm still digesting the different steps and thinking of what can be >> done, >> >> > but something that initially jumped out at me was >> >> > when you mentioned considering how much memory NiFi has and not >> wanting >> >> to >> >> > go over 1000 records per chunk... >> >> > >> >> > You should be able to read and write the chunks in a streaming fashion >> >> and >> >> > never have the entire chunk in memory. For example, >> >> > when creating the chunk you would be looping over a ResultSet from the >> >> > database and writing each record to the OutputStream of the >> >> > FlowFile, never having all 1000 records in memory. On the down stream >> >> > processor you would read the record from the InputStream of the >> >> > FlowFile, sending each one to the destination database, again not >> having >> >> > all 1000 records in memory. If you can operate like this then having >> >> > 1000 records per chunk, or 100,000 records per chunk, shouldn't change >> >> the >> >> > memory requirement for NiFi. >> >> > >> >> > An example of what we do for ExecuteSQL and QueryDatabaseTable is in >> the >> >> > JdbcCommon util where it converts the ResultSet to Avro records by >> >> writing >> >> > to the OutputStream: >> >> > >> >> >> https://github.com/apache/nifi/blob/e4b7e47836edf47042973e604005058c28eed23b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java#L80 >> >> > >> >> > Another point is that it is not necessarily a bad thing to have say >> >> 10,000 >> >> > Flow Files in a queue. The queue is not actually holding the content >> of >> >> > those FlowFiles, it is only holding pointers to where the content is, >> and >> >> > only when >> >> > the next processor does a session.read(flowFile, ...) does it then >> read >> >> in >> >> > the content as a stream. In general NiFi should be able to handle 10s >> of >> >> > thousands, or even 100s of thousands of Flow Files sitting in a queue. >> >> > >> >> > With your current approach have you seen a specific issue, such as >> out of >> >> > memory exceptions? or were you just concerned by the number of flow >> files >> >> > in the queue continuing to grow? >> >> > >> >> > I'll continue to think about this more, and maybe someone else on the >> >> list >> >> > has additional idea/thoughts. >> >> > >> >> > -Bryan >> >> > >> >> > >> >> > >> >> > On Wed, Jun 8, 2016 at 12:29 PM, Shaine Berube < >> >> > [email protected]> wrote: >> >> > >> >> >> Perhaps I need to explain a little more about the data flow as a >> whole. >> >> >> But yes, Processor A is a custom built processor. >> >> >> >> >> >> In explanation: >> >> >> The data flow that I've built out is basically a 4 to 6 step process >> (6 >> >> >> steps because the first and last processors are optional). In the >> four >> >> >> step process, step one gathers information from the source and target >> >> >> databases in preparation for the migration of data from source to >> >> target, >> >> >> this includes table names, primary keys, and record counts, step one >> >> then >> >> >> produces a flow file per table which in this case is 24 flow files. >> >> >> >> >> >> Step two of the process would be the equivalent of processor A, in >> step >> >> two >> >> >> I'm taking in a flow file and generating the SQL queries that are >> going >> >> to >> >> >> be run. The reason the back pressure doesn't work therefore is >> because >> >> the >> >> >> processor is working on one file, which corresponds to a table, which >> >> said >> >> >> table will be split into 1000 record chunks with the SQL query >> >> splitting. >> >> >> A fair few of these tables however, are over 10 million records, >> which >> >> >> means that on a single execution, this processor will generate over >> >> 10,000 >> >> >> flow files (1000 record chunks). As far as it goes, I cannot save >> this >> >> >> information directly to the VM or server that I'm running the data >> flow >> >> on, >> >> >> because the information can contain extremely sensitive and secure >> data. >> >> >> That being said, I need to consider how much memory the Nifi process >> >> has to >> >> >> run, so I don't want to go over 1000 records in a chunk. >> >> >> >> >> >> Step three of the process takes each individual flow file from the >> >> queue, >> >> >> pulls the SQL query out of the flow file contents, runs it against >> >> source, >> >> >> and then puts the results in either a CSV or an XML format into the >> >> >> contents of a flow file and sends it to the next queue. >> >> >> >> >> >> Step four of the process takes the results out of the flow file >> >> contents, >> >> >> sticks them into an SQL query and runs it against target. >> >> >> >> >> >> Keep in mind: this data flow has been built to handle migration, but >> >> also >> >> >> is attempting to keep up to date (incrementor/listener), with the >> source >> >> >> database. Given that we don't have full access to the source >> database, >> >> I'm >> >> >> basically limited to running select queries against it and gathering >> the >> >> >> information I need to put into target. But this data flow is >> >> configured to >> >> >> handle INSERT and UPDATE SQL queries, with DELETE queries coming some >> >> time >> >> >> in the future. The data flow is configured so that step one can >> either >> >> be >> >> >> the migrator (full data dump), or the incrementor (incremental data >> >> dump, >> >> >> use incrementor after migrator has been run). >> >> >> >> >> >> Now, the six step process adds a step before step one that allows >> step >> >> one >> >> >> to be multi-threaded, and it adds a step after step four that runs >> the >> >> >> queries (basically step four turns into the step that generates >> queries >> >> for >> >> >> the next step to run). If you want more information on the six step >> >> >> process I can give it, but it's not necessary to know for the >> question >> >> I'm >> >> >> asking. >> >> >> >> >> >> So now knowing more about the whole data flow model, is there a way >> for >> >> me >> >> >> to do some sort of notification system on step two so that we can >> keep >> >> the >> >> >> queue limited to 2000 between step two and step three? >> >> >> Keep in mind, each step in the four step process is another >> processor. >> >> So >> >> >> there are four processors involved in the data flow that I'm working >> >> with, >> >> >> all four have been custom developed by yours truly, they work >> directly >> >> with >> >> >> the databases and run SQL queries. >> >> >> >> >> >> On Wed, Jun 8, 2016 at 9:31 AM, Bryan Bende <[email protected]> >> wrote: >> >> >> >> >> >>> Ok I didn't realize you had already tried setting the back-pressure >> >> >>> settings. Can you described the processors a little more, are they >> >> custom >> >> >>> processors? >> >> >>> >> >> >>> I am guessing that ProcessorA is producing all 5k flow files from a >> >> >> single >> >> >>> execution of onTrigger, which would explain why back-pressure didn't >> >> >> solve >> >> >>> the problem, because >> >> >>> back-pressure would stop the processor from executing again, but its >> >> >>> already too late because the first execution already went over the >> >> limit. >> >> >>> >> >> >>> Without knowing too much about what ProcessorA is doing, I'm >> wondering >> >> >> if >> >> >>> there is a way to put some indirection between the two processors. >> What >> >> >> if >> >> >>> ProcessorA sent its >> >> >>> output to a PutFile processor that wrote all the chunks out to a >> >> >> directory, >> >> >>> then there was a separate GetFile processor that was concurrently >> >> picking >> >> >>> up the chunks from that >> >> >>> directory and sending to ProcessorB? >> >> >>> >> >> >>> Then the back-pressure between GetFile and ProcessorB would work >> >> because >> >> >>> once the queue reached 2000, GetFile wouldn't pick up anymore files. >> >> The >> >> >>> downside is you >> >> >>> would need enough disk-space on your NiFi node to possibly store >> your >> >> >> whole >> >> >>> database table, which may not be an option. >> >> >>> >> >> >>> Another idea might be to have two levels of chunks, for example with >> >> the >> >> >>> SplitText processor if we want to split a file with 1 million lines >> in >> >> >> it, >> >> >>> rather than do one split producing >> >> >>> 1 million flow files, we usually do a split to 10k chunks, then >> another >> >> >>> split down to 1 line. Maybe ProcessorA could produce much large >> chunks, >> >> >> say >> >> >>> 10k or 100k records each, >> >> >>> then the next processor further splits those before going to >> >> ProcessorB. >> >> >>> This would also allow back-pressure to work a little better the >> second >> >> >>> split processor and ProcessorB. >> >> >>> >> >> >>> If anyone else has ideas here, feel free to chime in. >> >> >>> >> >> >>> Thanks, >> >> >>> >> >> >>> Bryan >> >> >>> >> >> >>> On Wed, Jun 8, 2016 at 10:51 AM, Shaine Berube < >> >> >>> [email protected]> wrote: >> >> >>> >> >> >>>> I do need more information, because I tried using that option, but >> the >> >> >>>> processor just continued filling the queue anyway, I told it to >> only >> >> >>> allow >> >> >>>> 2000 before back pressure kicks in, but it kept going and I ended >> up >> >> >> with >> >> >>>> 5k files in the queue before I restarted Nifi to get the processor >> to >> >> >>> stop. >> >> >>>> >> >> >>>> On Wed, Jun 8, 2016 at 8:45 AM, Bryan Bende <[email protected]> >> wrote: >> >> >>>> >> >> >>>>> Hello, >> >> >>>>> >> >> >>>>> Take a look at the options available when right-clicking on a >> >> >> queue... >> >> >>>>> What you described is what NiFi calls back-pressure. You can >> >> >>> configured a >> >> >>>>> queue to have an object threshold (# of flow files) or data size >> >> >>>> threshold >> >> >>>>> (total size of all flow files). >> >> >>>>> When one of these thresholds is reached, NiFi will no longer let >> the >> >> >>>> source >> >> >>>>> processor run until the condition goes back under the threshold. >> >> >>>>> >> >> >>>>> Let us know if you need any more info on this. >> >> >>>>> >> >> >>>>> Thanks, >> >> >>>>> >> >> >>>>> Bryan >> >> >>>>> >> >> >>>>> On Wed, Jun 8, 2016 at 10:40 AM, Shaine Berube < >> >> >>>>> [email protected]> wrote: >> >> >>>>> >> >> >>>>>> Hello all, >> >> >>>>>> >> >> >>>>>> I'm kind of new to developing Nifi, though I've been doing some >> >> >>> pretty >> >> >>>> in >> >> >>>>>> depth stuff and some advanced database queries. My question is >> in >> >> >>>>>> regarding the queues between processor, I want to limit a queue >> >> >> to... >> >> >>>> say >> >> >>>>>> 2000, how would I go about doing that? Or better yet, how would >> I >> >> >>> tell >> >> >>>>> the >> >> >>>>>> processor generating the queue to only put a max of 2000 files >> into >> >> >>> the >> >> >>>>>> queue? >> >> >>>>>> >> >> >>>>>> Allow me to explain with a scenario: >> >> >>>>>> We are doing data migration from one database to another. >> >> >>>>>> -Processor A is generating a queue consumed by Processor B >> >> >>>>>> -Processor A is taking configuration and generating SQL queries >> in >> >> >>> 1000 >> >> >>>>>> record chunks so that Processor B can insert them into a new >> >> >>> database. >> >> >>>>>> Given the size of the source database, Processor A can >> potentially >> >> >>>>> generate >> >> >>>>>> hundreds of thousands of files. >> >> >>>>>> >> >> >>>>>> Is there a way for Processor A to check it's down stream queue >> for >> >> >>> the >> >> >>>>>> queue size? How would I get Processor A to only put 2000 files >> >> >> into >> >> >>>> the >> >> >>>>>> queue at any given time, so that Processor A can continue running >> >> >> but >> >> >>>>> wait >> >> >>>>>> for room in the queue? >> >> >>>>>> >> >> >>>>>> Thank you in advance. >> >> >>>>>> >> >> >>>>>> -- >> >> >>>>>> *Shaine Berube* >> >> >>>>>> >> >> >>>>> >> >> >>>> >> >> >>>> >> >> >>>> >> >> >>>> -- >> >> >>>> *Shaine Berube* >> >> >>>> >> >> >>> >> >> >> >> >> >> >> >> >> >> >> >> -- >> >> >> *Shaine Berube* >> >> >> >> >> >> >> >> > >> > >> > -- >> > *Shaine Berube* >> > > > > -- > *Shaine Berube*
