Thank you so much for the reply! We have one more question regarding the whole flow
- This entire flow described above has been encapsulated inside a template for export and import purpose in different environment. - Let's say one version of a Flow is running presently in PROD. - After making some minor changes, a new version of the flow has been imported into PROD. - Before starting the new version of the flow, we need to stop the old version; *otherwise there will be a problem with Kafka consumption from the same topic*. - So if we stop the old version of the flow and then start the new version of the flow it will be fine! - However there will be some FlowFiles still in the queue to be processed (like for Merge Content). How to completely process them? How to automate flush/ process of all FlowFiles before actually stop and mark it as an old version? - Find the rough Flow outline below as well *Flow outline* 1. Consume from Kafka 2. Evaluate JSON Path 3. Update attribute to get year, month, day from a Timestamp 4. Convert JSON to Avro 5. Merged content based on the date attribute 6. Write to GCS 7. Partitions refresh Than king you in advance ! ______________________ *Kind Regards,* *Anshuman Ghosh* *Contact - +49 179 9090964* On Mon, May 15, 2017 at 1:53 PM, Andrew Grande <[email protected]> wrote: > Yes to compress. The output of the merge step is a larger piece of data, no > more/older than configured by the merge step. It can produce partial > smaller buckets if it were configured with max age attribute. > > Andrew > > On Mon, May 15, 2017, 5:28 AM Anshuman Ghosh <[email protected] > > > wrote: > > > Thank you so much Bryan :-) > > It is working fine now as the following workflow > > > > *Consume from Kafka ==> * > > *Evaluate JSON path (Timestamp) ==> * > > *Update Attribute to get year, month and day; since we receive a 19 digit > > long Timestamp value , we had to use the following trick > > > > (**${Click.RequestTimestamp:toString():substring(0,13): > toNumber():format("yyyy", > > "GMT")}**) ==> Convert JSON to Avro ==> * > > *Merge Content on similar Attribute (Timestamp - Date) ==> * > > *Write merged FlowFile onto Google Cloud Storage (GCS) buckets* > > > > Let me know whether it can be further improvised. > > Also will it be okay to use a "*CompressContent*" processor right after > > merge step? > > > > > > Than > > king you in advance! > > > > > > ______________________ > > > > *Kind Regards,* > > *Anshuman Ghosh* > > *Contact - +49 179 9090964* > > > > > > > > On Thu, May 11, 2017 at 4:44 PM, Joe Witt <[email protected]> wrote: > > > > > Cool. Bryan offers a good approach now. And this JIRA captures a > > > really powerful way to do it going forward > > > https://issues.apache.org/jira/browse/NIFI-3866 > > > > > > Thanks > > > Joe > > > > > > On Thu, May 11, 2017 at 10:41 AM, Bryan Bende <[email protected]> > wrote: > > > > If your data is JSON, then you could extract the date field from the > > > > JSON before you convert to Avro by using EvaluateJsonPath. > > > > > > > > From there lets say you have an attribute called "time" with the unix > > > > timestamp, you could use an UpdateAttribute processor to create > > > > attributes for each part of the timestamp: > > > > > > > > time.year = ${time:format("yyyy", "GMT")} > > > > time.month = ${time:format("MM", "GMT")} > > > > time.day = ${time:format("dd", "GMT")} > > > > > > > > Then in PutHDFS you can do something similar to what you were already > > > doing: > > > > > > > > /year=${time.year}/month=${time.month}/day=${time.day}/ > > > > > > > > As Joe mentioned there is a bunch of new record reader/writer related > > > > capabilities in 1.2.0, and there is a follow JIRA to add a "record > > > > path" which would allow you to extract a value (like your date field) > > > > from any data format. > > > > > > > > On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh > > > > <[email protected]> wrote: > > > >> Hello Joe, > > > >> > > > >> Regret for the inconvenience, I would keep that in mind going > forward! > > > >> > > > >> Thank you for your suggestion :-) > > > >> We have recently built NiFi from the master branch, so it should be > > > similar > > > >> to 1.2.0 > > > >> We receive data in JSON format and then convert to Avro before > writing > > > to > > > >> HDFS. > > > >> The date filed here is an Unix timestamp of 19 digit (bigint) > > > >> > > > >> It would be really great if you can help a bit on how we can achieve > > the > > > >> same with Avro here. > > > >> Thanking you in advance! > > > >> > > > >> > > > >> ______________________ > > > >> > > > >> *Kind Regards,* > > > >> *Anshuman Ghosh* > > > >> *Contact - +49 179 9090964* > > > >> > > > >> > > > >> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <[email protected]> > wrote: > > > >> > > > >>> Anshuman > > > >>> > > > >>> Hello. Please avoid directly addressing specific developers and > > > >>> instead just address the mailing list you need (dev or user). > > > >>> > > > >>> If your data is CSV, for example, you can use RouteText to > > efficiently > > > >>> partition the incoming sets by matching field/column values and in > so > > > >>> doing you'll now have the flowfile attribute you need for that > group. > > > >>> Then you can merge those together with MergeContent for like > > > >>> attributes and when writing to HDFS you can use that value. > > > >>> > > > >>> With the next record reader/writer capabilities in Apache NiFI > 1.2.0 > > > >>> we can now provide a record oriented PartitionRecord processor > which > > > >>> will then also let you easily do this pattern on all kinds of > > > >>> formats/schemas in a nice/clean way. > > > >>> > > > >>> Joe > > > >>> > > > >>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh > > > >>> <[email protected]> wrote: > > > >>> > Hello everyone, > > > >>> > > > > >>> > It would be great if you can help me implementing this use-case > > > >>> > > > > >>> > Is there any way (NiFi processor) to use an attribute (field/ > > column) > > > >>> value > > > >>> > for partitioning when writing the final FlowFile to HDFS/ other > > > storage. > > > >>> > Earlier we were using simple system date > > > >>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/ > > > >>> day=${now():format('dd')}/) > > > >>> > for this but that doesn't make sense when we consume old data > from > > > Kafka > > > >>> and > > > >>> > want to partition on original date (a date field inside Kafka > > > message) > > > >>> > > > > >>> > > > > >>> > Thank you! > > > >>> > ______________________ > > > >>> > > > > >>> > Kind Regards, > > > >>> > Anshuman Ghosh > > > >>> > Contact - +49 179 9090964 > > > >>> > > > > >>> > > > > > >
