Thank you so much for the reply!

We have one more question regarding the whole flow

   - This entire flow described above has been encapsulated inside a
   template for export and import purpose in different environment.
   - Let's say one version of a Flow is running presently in PROD.
   - After making some minor changes, a new version of the flow has been
   imported into PROD.
   - Before starting the new version of the flow, we need to stop the old
   version; *otherwise there will be a problem with Kafka consumption from
   the same topic*.
   - So if we stop the old version of the flow and then start the new
   version of the flow it will be fine!
   - However there will be some FlowFiles still in the queue to be
   processed (like for Merge Content). How to completely process them? How to
   automate flush/ process of all FlowFiles before actually stop and mark it
   as an old version?
   - Find the rough Flow outline below as well

*Flow outline*
1. Consume from Kafka
2. Evaluate JSON Path
3. Update attribute to get year, month, day from a Timestamp
4. Convert JSON to Avro
5. Merged content based on the date attribute
6. Write to GCS
7. Partitions refresh


Than
​king
 you
​ in advance​
!
​
______________________

*Kind Regards,*
*Anshuman Ghosh*
*Contact - +49 179 9090964*


On Mon, May 15, 2017 at 1:53 PM, Andrew Grande <[email protected]> wrote:

> Yes to compress. The output of the merge step is a larger piece of data, no
> more/older than configured by the merge step. It can produce partial
> smaller buckets if it were configured with max age attribute.
>
> Andrew
>
> On Mon, May 15, 2017, 5:28 AM Anshuman Ghosh <[email protected]
> >
> wrote:
>
> > Thank you so much Bryan :-)
> > It is working fine now as the following workflow
> >
> > *Consume from Kafka ==> *
> > *Evaluate JSON path (Timestamp) ==> *
> > *Update Attribute to get year, month and day; since we receive a 19 digit
> > long Timestamp value , we had to use the following trick
> >
> > (**${Click.RequestTimestamp:toString():substring(0,13):
> toNumber():format("yyyy",
> > "GMT")}**) ==> Convert JSON to Avro ==> *
> > *Merge Content on similar Attribute (Timestamp - Date) ==> *
> > *Write merged FlowFile onto Google Cloud Storage (GCS) buckets*
> >
> > ​Let me know whether it can be further improvised.
> > Also will it be okay to use a "*CompressContent*" processor right after
> > merge step?​
> >
> >
> > Than
> > ​king you in advance!​
> >
> > ​
> > ______________________
> >
> > *Kind Regards,*
> > *Anshuman Ghosh*
> > *Contact - +49 179 9090964*
> >
> >
> >
> > On Thu, May 11, 2017 at 4:44 PM, Joe Witt <[email protected]> wrote:
> >
> > > Cool.  Bryan offers a good approach now.  And this JIRA captures a
> > > really powerful way to do it going forward
> > > https://issues.apache.org/jira/browse/NIFI-3866
> > >
> > > Thanks
> > > Joe
> > >
> > > On Thu, May 11, 2017 at 10:41 AM, Bryan Bende <[email protected]>
> wrote:
> > > > If your data is JSON, then you could extract the date field from the
> > > > JSON before you convert to Avro by using EvaluateJsonPath.
> > > >
> > > > From there lets say you have an attribute called "time" with the unix
> > > > timestamp, you could use an UpdateAttribute processor to create
> > > > attributes for each part of the timestamp:
> > > >
> > > > time.year = ${time:format("yyyy", "GMT")}
> > > > time.month = ${time:format("MM", "GMT")}
> > > > time.day = ${time:format("dd", "GMT")}
> > > >
> > > > Then in PutHDFS you can do something similar to what you were already
> > > doing:
> > > >
> > > > /year=${time.year}/month=${time.month}/day=${time.day}/
> > > >
> > > > As Joe mentioned there is a bunch of new record reader/writer related
> > > > capabilities in 1.2.0, and there is a follow JIRA to add a "record
> > > > path" which would allow you to extract a value (like your date field)
> > > > from any data format.
> > > >
> > > > On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
> > > > <[email protected]> wrote:
> > > >> Hello Joe,
> > > >>
> > > >> Regret for the inconvenience, I would keep that in mind going
> forward!
> > > >>
> > > >> Thank you for your suggestion :-)
> > > >> We have recently built NiFi from the master branch, so it should be
> > > similar
> > > >> to 1.2.0
> > > >> We receive data in JSON format and then convert to Avro before
> writing
> > > to
> > > >> HDFS.
> > > >> The date filed here is an Unix timestamp of 19 digit (bigint)
> > > >>
> > > >> It would be really great if you can help a bit on how we can achieve
> > the
> > > >> same with Avro here.
> > > >> Thanking you in advance!
> > > >>
> > > >>
> > > >> ______________________
> > > >>
> > > >> *Kind Regards,*
> > > >> *Anshuman Ghosh*
> > > >> *Contact - +49 179 9090964*
> > > >>
> > > >>
> > > >> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <[email protected]>
> wrote:
> > > >>
> > > >>> Anshuman
> > > >>>
> > > >>> Hello.  Please avoid directly addressing specific developers and
> > > >>> instead just address the mailing list you need (dev or user).
> > > >>>
> > > >>> If your data is CSV, for example, you can use RouteText to
> > efficiently
> > > >>> partition the incoming sets by matching field/column values and in
> so
> > > >>> doing you'll now have the flowfile attribute you need for that
> group.
> > > >>> Then you can merge those together with MergeContent for like
> > > >>> attributes and when writing to HDFS you can use that value.
> > > >>>
> > > >>> With the next record reader/writer capabilities in Apache NiFI
> 1.2.0
> > > >>> we can now provide a record oriented PartitionRecord processor
> which
> > > >>> will then also let you easily do this pattern on all kinds of
> > > >>> formats/schemas in a nice/clean way.
> > > >>>
> > > >>> Joe
> > > >>>
> > > >>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
> > > >>> <[email protected]> wrote:
> > > >>> > Hello everyone,
> > > >>> >
> > > >>> > It would be great if you can help me implementing this use-case
> > > >>> >
> > > >>> > Is there any way (NiFi processor) to use an attribute (field/
> > column)
> > > >>> value
> > > >>> > for partitioning when writing the final FlowFile to HDFS/ other
> > > storage.
> > > >>> > Earlier we were using simple system date
> > > >>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
> > > >>> day=${now():format('dd')}/)
> > > >>> > for this but that doesn't make sense when we consume old data
> from
> > > Kafka
> > > >>> and
> > > >>> > want to partition on original date (a date field inside Kafka
> > > message)
> > > >>> >
> > > >>> >
> > > >>> > Thank you!
> > > >>> > ______________________
> > > >>> >
> > > >>> > Kind Regards,
> > > >>> > Anshuman Ghosh
> > > >>> > Contact - +49 179 9090964
> > > >>> >
> > > >>>
> > >
> >
>

Reply via email to