Hi Ramya,

This would be a great feature, but unfortunately is not support (yet) by
Flink SQL.
Currently, all late records are dropped.

A workaround is to ingest the stream as a DataStream, have a custom
operator that routes all late records to a side output, and registering the
DataStream without late records as a table on which the SQL query is
evaluated.
This requires quite a bit of boilerplate code but could be hidden in a util
class.

Best, Fabian

Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy <
hair...@gmail.com>:

> Hi,
>
> I have a query with regard to Late arriving records.
> We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
> In my sink operators, which converts this table to a stream which is being
> pushed to Elastic Search, I am able to see this metric "
> *numLateRecordsDropped*".
>
> My Kafka consumers doesn't seem to have any lag and the events are
> processed properly. To be able to take these events to a side outputs
> doesn't seem to be possible with tables. Below is the snippet:
>
>         tableEnv.connect(new Kafka()
>           /* setting of all kafka properties */
>                .startFromLatest())
>                .withSchema(new Schema()
>                        .field("sid", Types.STRING())
>                        .field("_zpsbd6", Types.STRING())
>                        .field("r1", Types.STRING())
>                        .field("r2", Types.STRING())
>                        .field("r5", Types.STRING())
>                        .field("r10", Types.STRING())
>                        .field("isBot", Types.BOOLEAN())
>                        .field("botcode", Types.STRING())
>                        .field("ts", Types.SQL_TIMESTAMP())
>                        .rowtime(new Rowtime()
>                                .timestampsFromField("recvdTime")
>                                .watermarksPeriodicBounded(10000)
>                        )
>                )
>                .withFormat(new Json().deriveSchema())
>                .inAppendMode()
>                .registerTableSource("sourceTopic");
>
>        String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, "
>                + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, "
>                + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM
> sourceTopic "
>                + "WHERE r1='true' or r2='true' or r5='true' or r10='true'
> and isBot='true' "
>                + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,  _zpsbd6";
>
> Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is
> showing the lateRecordsDropped, while executing the group by operation.
>
> Is there  a way to get the sideOutput of this to be able to debug better ??
>
> Thanks,
> ~Ramya.
>

Reply via email to