Hello Andrey,

Thanks for your quick response. I have tried with your above code but it
didn't suit's my requirement. I need global ordering of my records by using
multiple kafka partitions. Please suggest me any workaround for this. as
mentioned in this
<https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Streams>
link is it possible to buffer data for some amount of time and then perform
sort on this or any other way out there?

-----------------------------------------------
*Amol Suryawanshi*
Java Developer
am...@iprogrammer.com


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <sac...@iprogrammer.com>
------------------------------------------------

On Tue, Jun 19, 2018 at 10:19 PM, Andrey Zagrebin <and...@data-artisans.com>
wrote:

> Hi Amol,
>
> I think you could try (based on your stack overflow code)
> org.apache.flink.streaming.api.functions.timestamps.
> BoundedOutOfOrdernessTimestampExtractor
> like this:
>
> DataStream<Document> streamSource = env
>    .addSource(kafkaConsumer)
>    .setParallelism(4)
>    .assignTimestampsAndWatermarks(
>        new 
> BoundedOutOfOrdernessTimestampExtractor<Document>(Time.milliseconds(3500))
> {
>            @Override
>            public long extractTimestamp(Event element) {
>                Map timeStamp = (Map) event.get("ts”);
>                return (long) timeStamp.get("value");
>            }
>        });
>
> In general, if records are sorted by anything in a Kafka partition,
> parallel subtask of Flink Kafka source will consume these records and push
> to user operators in the same order. There is maximum one consuming subtask
> per Kafka partition but several partitions might be served by one subtask.
> It means that there are the same guarantees as in Kafka: ordering per
> partition but not across them, including no global ordering.
>
> The case of global and per window ordering is already described by Sihua.
> The global ordering might be impractical in case of distributed system.
>
> If a subtask of your Flink operator consumes from several partitions or
> there is no ordering at all, you can try the above approach with
> BoundedOutOfOrdernessTimestampExtractor to get approximate ordering
> across these partitions per key or all records. It is similar to ordering
> within a window. It means there could still be late records coming after
> out of orderness period of time which can break the ordering. This operator
> buffers records in state to maintain the order but only for out of
> orderness period of time which also increases latency.
>
> Cheers,
> Andrey
>
> > On 19 Jun 2018, at 14:12, sihua zhou <summerle...@163.com> wrote:
> >
> >
> >
> > Hi Amol,
> >
> >
> > I'm not sure whether this is impossible, especially when you need to
> operate the record in multi parallelism.
> >
> >
> > IMO, in theroy, we can only get a ordered stream when there is a single
> partition of kafka and operate it with a single parallelism in flink. Even
> in this case, if you only want to order the records in a window, than you
> need to store the records in the state, and order them when the window is
> triggered. But if you want to order the records with a single
> `keyBy()`(non-window), I think that's maybe impossible in practice, because
> you need to store the all the incoming records and order the all data for
> every incoming records, also you need to send retracted message for the
> previous result(because every incoming record might change the global order
> of the records).
> >
> >
> > Best, Sihua
> > On 06/19/2018 19:19,Amol S - iProgrammer<am...@iprogrammer.com> wrote:
> > Hi,
> >
> > I have used flink streaming API in my application where the source of
> > streaming is kafka. My kafka producer will publish data in ascending
> order
> > of time in different partitions of kafka and consumer will read data from
> > these partitions. However some kafka partitions may be slow due to some
> > operation and produce late results. Is there any way to maintain order in
> > this stream though the data arrive out of order. I have tried
> > BoundedOutOfOrdernessTimestampExtractor but it didn't served the
> purpose.
> > While digging this problem I came across your documentation (URL:
> > https://cwiki.apache.org/confluence/display/FLINK/Time+
> and+Order+in+Streams)
> > and tried to implement this but it didnt worked. I also tried with Table
> > API order by but it seems you not support orderBy in flink 1.5 version.
> > Please suggest me any workaround for this.
> >
> > I have raised same concern on stack overflow
> >
> > https://stackoverflow.com/questions/50904615/ordering-
> of-streams-while-reading-data-from-multiple-kafka-partitions
> >
> > Thanks,
> >
> > -----------------------------------------------
> > *Amol Suryawanshi*
> > Java Developer
> > am...@iprogrammer.com
> >
> >
> > *iProgrammer Solutions Pvt. Ltd.*
> >
> >
> >
> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
> 411016,
> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> > www.iprogrammer.com <sac...@iprogrammer.com>
> > ------------------------------------------------
>
>

Reply via email to