Hello andrey,

Thanks for the help.

I am trying to implement your above given code code

BoundedOutOfOrdernessTimestampExtractor<>(…) {…})
    .process(new OrderTheRecords()))

but I am facing issues to write *OrderTheRecords *class as I am new to this
framework can you please suggest me what is optimal way to sort the records?

I have implemented below ProcessWindowFunction code

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class OrderTheRecords extends ProcessWindowFunction<Oplog,
Oplog, Long, TimeWindow> {

    public void process(Long s, Context context, Iterable<Oplog>
iterable, Collector<Oplog> collector) throws Exception {
        for (Oplog oplog : iterable) {


public class Oplog {

    private OplogTimestamp ts;
    private String op;
    private BasicDBObject o;


here *ts* represents even timestamp.

*Amol Suryawanshi*
Java Developer

*iProgrammer Solutions Pvt. Ltd.*

*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <sac...@iprogrammer.com>

On Wed, Jun 20, 2018 at 6:51 PM, Andrey Zagrebin <and...@data-artisans.com>

> Hi Amol,
> In above code also it will sort the records in specific time window only.
> All windows will be emitted as watermark passes the end of the window. The
> watermark only increases. So the non-overlapping windows should be also
> sorted by time and as a consequence the records across windows either, if
> this is the concern about sorting records only in a specific time window.
> 1. How should I create N consumers dynamically based on partition count?
> sourceStream.setParallelism(N), each Flink consumer parallel subtask will
> serve one Kafka partition.
> 2. Is number of consumers dynamically grows as number of partition
> increased in middle of execution?
> Dynamically added Kafka partitions will be eventually discovered by Flink
> consumers (flink.partition-discovery.interval-millis) and picked up by
> some consumer. Flink job has be rescaled separately.
> Currently parallelism of Flink operator cannot be changed while the job is
> running. The way to go now is to use savepoint/checkpoint, stop the job and
> start the new one with changed parallelism from the
> previous savepoint/checkpoint (see Flink docs). New job will pick up from
> partition offsets of previous job.
> 3. How to create partition specific kafka consumer in flink?
> The partition-consumer assignment is now implementation specific for Flink.
> There is an open issue for custom assignment https://issues.apac
> he.org/jira/browse/FLINK-8570 e.g. if you need specific locality of
> keys/consumers.
> I would simply suggest to assign some key to each record and let all
> records for particular key to go into the same Kafka partition. On the
> Flink side if a corresponding keyBy() is applied to the Kafka source, all
> the records for this particular key will go to the same parallel subtask of
> subsequent operator, sorted by time if they were originally sorted in its
> Kafka partition. This is more scalable approach than total global ordering.
> Cheers,
> Andrey
> On 20 Jun 2018, at 13:17, Amol S - iProgrammer <am...@iprogrammer.com>
> wrote:
> Hello Andrey,
> In above code also it will sort the records in specific time window only.
> Anyways we agreed to create N number of partitions with N number of
> consumers based on some key as order is maintained per kafka partition.
> I have some questions about this.
> 1. How should I create N consumers dynamically based on partition count?
> 2. Is number of consumers dynamically grows as number of partition
> increased in middle of execution?
> 3. How to create partition specific kafka consumer in flink?
> -----------------------------------------------
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
> *iProgrammer Solutions Pvt. Ltd.*
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com <sac...@iprogrammer.com>
> ------------------------------------------------
> On Wed, Jun 20, 2018 at 2:38 PM, Andrey Zagrebin <and...@data-artisans.com
> >
> wrote:
> Hi,
> Good point, sorry for confusion, BoundedOutOfOrdernessTimestampExtractor
> of course does not buffer records, you need to apply windowing
> (e.g. TumblingEventTimeWindows) for that and then sort the window output
> by time and emit records in sorted order.
> You can also use windowAll which already does keyBy((record) -> 0) and
> makes the stream non-parallel:
> sourceStream
>            .setParallelism(4)
>            .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<>(…) {…})
>            .windowAll(TumblingEventTimeWindows.of(Time...))
>    .process(new OrderTheRecords()))
> Cheers,
> Andrey
> On 20 Jun 2018, at 10:14, sihua zhou <summerle...@163.com> wrote:
> Hi,
> I think a global ordering is a bit impractical on production, but in
> theroy, you still can do that. You need to
> - Firstly fix the operate's parallelism to 1(except the source node).
> - If you want to sort the records within a bouned time, then you can
> keyBy() a constant and window it, buffer the records in the state and sort
> the records when the window is triggered, the code maybe as follows.
>    {code}
>        sourceStream
>            .setParallelism(4)
>            .assignTimestampsAndWatermarks(
>                new BoundedOutOfOrdernessTimestamp
> Extractor<Document>(Time.milliseconds(3500))
>                {
>                    @Override
>                    public long extractTimestamp(Event element) {
>                        Map timeStamp = (Map) event.get("ts”);
>                        return (long) timeStamp.get("value");
>                    }
>             })
>            .keyBy((record) -> 0)// keyby the constant value
>            .window(...)
>            .process(new OrderTheRecords()))
>            .setParallelism(1);
>    {code}
> - If you want to sort the records truly globally(non-window), then you
> could keyBy a constant, store the records in the state and sort the records
> in the process() function for every incoming record. And if you want a
> perfect correct output, then maybe you need to do retraction (because every
> incoming records may change the globally order), the code maybe as follows
>    {code}
>        sourceStream
>            .setParallelism(4)
>            .keyBy((record) -> 0) // keyby the constant value
>            .process(new OrderTheRecords()))
>            .setParallelism(1);
>    {code}
> In all the case, you need to fix the parallelism of the OrderTheRecord
> operate to 1, which makes your job non-scale-able and becomes the
> bottleneck. So a global ordering maybe not practical on production (but if
> the source's TPS is very low, then maybe practical).
> Best, Sihua
> On 06/20/2018 15:36,Amol S - iProgrammer<am...@iprogrammer.com>
> <am...@iprogrammer.com> wrote:
> Hello Andrey,
> Thanks for your quick response. I have tried with your above code but it
> didn't suit's my requirement. I need global ordering of my records by using
> multiple kafka partitions. Please suggest me any workaround for this. as
> mentioned in this
> <https://cwiki.apache.org/confluence/display/FLINK/Time+
> and+Order+in+Streams>
> link is it possible to buffer data for some amount of time and then perform
> sort on this or any other way out there?
> -----------------------------------------------
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
> *iProgrammer Solutions Pvt. Ltd.*
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com <sac...@iprogrammer.com>
> ------------------------------------------------
> On Tue, Jun 19, 2018 at 10:19 PM, Andrey Zagrebin <
> and...@data-artisans.com>
> wrote:
> Hi Amol,
> I think you could try (based on your stack overflow code)
> org.apache.flink.streaming.api.functions.timestamps.
> BoundedOutOfOrdernessTimestampExtractor
> like this:
> DataStream<Document> streamSource = env
> .addSource(kafkaConsumer)
> .setParallelism(4)
> .assignTimestampsAndWatermarks(
> new BoundedOutOfOrdernessTimestampExtractor<Document>(Time.
> milliseconds(3500))
> {
> @Override
> public long extractTimestamp(Event element) {
> Map timeStamp = (Map) event.get("ts”);
> return (long) timeStamp.get("value");
> }
> });
> In general, if records are sorted by anything in a Kafka partition,
> parallel subtask of Flink Kafka source will consume these records and push
> to user operators in the same order. There is maximum one consuming subtask
> per Kafka partition but several partitions might be served by one subtask.
> It means that there are the same guarantees as in Kafka: ordering per
> partition but not across them, including no global ordering.
> The case of global and per window ordering is already described by Sihua.
> The global ordering might be impractical in case of distributed system.
> If a subtask of your Flink operator consumes from several partitions or
> there is no ordering at all, you can try the above approach with
> BoundedOutOfOrdernessTimestampExtractor to get approximate ordering
> across these partitions per key or all records. It is similar to ordering
> within a window. It means there could still be late records coming after
> out of orderness period of time which can break the ordering. This operator
> buffers records in state to maintain the order but only for out of
> orderness period of time which also increases latency.
> Cheers,
> Andrey
> On 19 Jun 2018, at 14:12, sihua zhou <summerle...@163.com> wrote:
> Hi Amol,
> I'm not sure whether this is impossible, especially when you need to
> operate the record in multi parallelism.
> IMO, in theroy, we can only get a ordered stream when there is a single
> partition of kafka and operate it with a single parallelism in flink. Even
> in this case, if you only want to order the records in a window, than you
> need to store the records in the state, and order them when the window is
> triggered. But if you want to order the records with a single
> `keyBy()`(non-window), I think that's maybe impossible in practice, because
> you need to store the all the incoming records and order the all data for
> every incoming records, also you need to send retracted message for the
> previous result(because every incoming record might change the global order
> of the records).
> Best, Sihua
> On 06/19/2018 19:19,Amol S - iProgrammer<am...@iprogrammer.com> wrote:
> Hi,
> I have used flink streaming API in my application where the source of
> streaming is kafka. My kafka producer will publish data in ascending
> order
> of time in different partitions of kafka and consumer will read data from
> these partitions. However some kafka partitions may be slow due to some
> operation and produce late results. Is there any way to maintain order in
> this stream though the data arrive out of order. I have tried
> BoundedOutOfOrdernessTimestampExtractor but it didn't served the
> purpose.
> While digging this problem I came across your documentation (URL:
> https://cwiki.apache.org/confluence/display/FLINK/Time+
> and+Order+in+Streams)
> and tried to implement this but it didnt worked. I also tried with Table
> API order by but it seems you not support orderBy in flink 1.5 version.
> Please suggest me any workaround for this.
> I have raised same concern on stack overflow
> https://stackoverflow.com/questions/50904615/ordering-
> of-streams-while-reading-data-from-multiple-kafka-partitions
> Thanks,
> -----------------------------------------------
> *Amol Suryawanshi*
> Java Developer
> am...@iprogrammer.com
> *iProgrammer Solutions Pvt. Ltd.*
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
> 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com <sac...@iprogrammer.com>
> ------------------------------------------------

Reply via email to