Sumo,
Should be doable. The only part that may be tricky is the filename showing
1_N if that means the whole thing has to retain sequential ordering from
source through destination. Otherwise...
When merging flowfiles together you need to decide 'how should content be
merged' and 'how should attributes be merged'. The properties to control
that are 'Merge Strategy' and 'Attribute Strategy' respectively. For merge
strategy you'll want to do binary merge. For the attribute strategy the
default of keeping only common attributes should likely be sufficient. The
reason it should be is the information that you'll need for writing to HDFS
then is the common databaseName, tableName, and action. When merging
you'll merge by all three of these attributes combined. You can do this by
creating an attribute that combines those three things right after your
extract attributes processor.
Lets say your extract attributes pulls out 'databaseName', 'tableName' and
'action'. If so then put an UpdateAttributes between your extract
attributes and MergeContent (or you could use HashAttribute as well). In
this create an attribute called 'correlatation-identifier' and give it a
value of ${databaseName}-${tableName}-${action}
Then in merge content use that correlation-identifier attribute in the
'Correlation Attribute Name' property.
Now, given that you'll be smashing JSON documents together keep in mind the
resulting smashed together thing would not be valid JSON itself. You'd
need to either make sure when it is merged that the resulting output is
also valid JSON which you can do by using MergeContent's
header/footer/demarcator feature. Or, you need the thing that reads these
merged JSON documents to be able to demarcate them for you.
If you want to end up with roughly 64MB bundles and these objects can be
quite small (between say 1 and 10KB) then you'd be bundling around
6000-10000 objects each time and that is not factoring in compression. I'd
recommend a two phase merge with a GZIP compression step in between then.
GZIP is nice as it compresses quite fast and it can be safely
concatenated. So the 'merge step' would really be:
- First Merge
- GZIP Compress
- Final Merge
In first merge do bundles of at least 800 objects but no more than 1000 and
set an age kick-out of say 1 minute or whatever is appropriate in your case
In GZIP compress set level 1
In final merge do bundles of at least 55MB but no more than 64MB with an
age kick-out of say 5 minutes or whatever is appropriate in your case
Since the common attributes you needed will be retained in this model you
will be able to write to hdfs using a path of something like
'/${databaseName}/${tableName}/${action}/${uuid}.whatever.
Now that I got here I just noticed you set 'tar' so presumably you are
using tar merging strategy and most likely this is to address how to keep
these objects separate and avoid the need for header/foot/demarcator/etc..
Good choice as well.
There are a lot of ways to slice this up.
Thanks
Joe
On Wed, Jun 15, 2016 at 6:04 PM, Sumanth Chinthagunta <[email protected]>
wrote:
>
> Hi,
> I have following flow that receives JSON data from Kafka and writes to
> HDFS.
> Each flowFile received from Kafka has following attributes and JSON
> payload.
> 1. databaseName = db1 or db2 etc
> 2. tableName = customer or address etc
>
> 3. action = [insert, update, delete]
> My goal is to merge 1000 flowFlies into single file and write to HDFS
> (because writing large files into HDFS is more efficient then writing small
> JSON files.)
> I also want to write into HDFS folder structure like:
> /<databaseName>/<tableName>/<action>/1_1000.tar
> /<databaseName>/<tableName>/<action>/1000_2000.tar
>
> With default *MergeContent* configuration, I am losing individual
> flowFile’s attributes and cannot organize bin files into directory
> structure. Is it possible to accomplish my goal with MergeContent?
>
> Thanks
> -Sumo
>