Thank you so much Bryan :-)
It is working fine now as the following workflow

*Consume from Kafka ==> *
*Evaluate JSON path (Timestamp) ==> *
*Update Attribute to get year, month and day; since we receive a 19 digit
long Timestamp value , we had to use the following trick
(**${Click.RequestTimestamp:toString():substring(0,13):toNumber():format("yyyy",
"GMT")}**) ==> Convert JSON to Avro ==> *
*Merge Content on similar Attribute (Timestamp - Date) ==> *
*Write merged FlowFile onto Google Cloud Storage (GCS) buckets*

​Let me know whether it can be further improvised.
Also will it be okay to use a "*CompressContent*" processor right after
merge step?​


Than
​king you in advance!​

​
______________________

*Kind Regards,*
*Anshuman Ghosh*
*Contact - +49 179 9090964*



On Thu, May 11, 2017 at 4:44 PM, Joe Witt <[email protected]> wrote:

> Cool.  Bryan offers a good approach now.  And this JIRA captures a
> really powerful way to do it going forward
> https://issues.apache.org/jira/browse/NIFI-3866
>
> Thanks
> Joe
>
> On Thu, May 11, 2017 at 10:41 AM, Bryan Bende <[email protected]> wrote:
> > If your data is JSON, then you could extract the date field from the
> > JSON before you convert to Avro by using EvaluateJsonPath.
> >
> > From there lets say you have an attribute called "time" with the unix
> > timestamp, you could use an UpdateAttribute processor to create
> > attributes for each part of the timestamp:
> >
> > time.year = ${time:format("yyyy", "GMT")}
> > time.month = ${time:format("MM", "GMT")}
> > time.day = ${time:format("dd", "GMT")}
> >
> > Then in PutHDFS you can do something similar to what you were already
> doing:
> >
> > /year=${time.year}/month=${time.month}/day=${time.day}/
> >
> > As Joe mentioned there is a bunch of new record reader/writer related
> > capabilities in 1.2.0, and there is a follow JIRA to add a "record
> > path" which would allow you to extract a value (like your date field)
> > from any data format.
> >
> > On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
> > <[email protected]> wrote:
> >> Hello Joe,
> >>
> >> Regret for the inconvenience, I would keep that in mind going forward!
> >>
> >> Thank you for your suggestion :-)
> >> We have recently built NiFi from the master branch, so it should be
> similar
> >> to 1.2.0
> >> We receive data in JSON format and then convert to Avro before writing
> to
> >> HDFS.
> >> The date filed here is an Unix timestamp of 19 digit (bigint)
> >>
> >> It would be really great if you can help a bit on how we can achieve the
> >> same with Avro here.
> >> Thanking you in advance!
> >>
> >>
> >> ______________________
> >>
> >> *Kind Regards,*
> >> *Anshuman Ghosh*
> >> *Contact - +49 179 9090964*
> >>
> >>
> >> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <[email protected]> wrote:
> >>
> >>> Anshuman
> >>>
> >>> Hello.  Please avoid directly addressing specific developers and
> >>> instead just address the mailing list you need (dev or user).
> >>>
> >>> If your data is CSV, for example, you can use RouteText to efficiently
> >>> partition the incoming sets by matching field/column values and in so
> >>> doing you'll now have the flowfile attribute you need for that group.
> >>> Then you can merge those together with MergeContent for like
> >>> attributes and when writing to HDFS you can use that value.
> >>>
> >>> With the next record reader/writer capabilities in Apache NiFI 1.2.0
> >>> we can now provide a record oriented PartitionRecord processor which
> >>> will then also let you easily do this pattern on all kinds of
> >>> formats/schemas in a nice/clean way.
> >>>
> >>> Joe
> >>>
> >>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
> >>> <[email protected]> wrote:
> >>> > Hello everyone,
> >>> >
> >>> > It would be great if you can help me implementing this use-case
> >>> >
> >>> > Is there any way (NiFi processor) to use an attribute (field/ column)
> >>> value
> >>> > for partitioning when writing the final FlowFile to HDFS/ other
> storage.
> >>> > Earlier we were using simple system date
> >>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
> >>> day=${now():format('dd')}/)
> >>> > for this but that doesn't make sense when we consume old data from
> Kafka
> >>> and
> >>> > want to partition on original date (a date field inside Kafka
> message)
> >>> >
> >>> >
> >>> > Thank you!
> >>> > ______________________
> >>> >
> >>> > Kind Regards,
> >>> > Anshuman Ghosh
> >>> > Contact - +49 179 9090964
> >>> >
> >>>
>

Reply via email to