Hi Ramya, This would be a great feature, but unfortunately is not support (yet) by Flink SQL. Currently, all late records are dropped.
A workaround is to ingest the stream as a DataStream, have a custom operator that routes all late records to a side output, and registering the DataStream without late records as a table on which the SQL query is evaluated. This requires quite a bit of boilerplate code but could be hidden in a util class. Best, Fabian Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy < hair...@gmail.com>: > Hi, > > I have a query with regard to Late arriving records. > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11. > In my sink operators, which converts this table to a stream which is being > pushed to Elastic Search, I am able to see this metric " > *numLateRecordsDropped*". > > My Kafka consumers doesn't seem to have any lag and the events are > processed properly. To be able to take these events to a side outputs > doesn't seem to be possible with tables. Below is the snippet: > > tableEnv.connect(new Kafka() > /* setting of all kafka properties */ > .startFromLatest()) > .withSchema(new Schema() > .field("sid", Types.STRING()) > .field("_zpsbd6", Types.STRING()) > .field("r1", Types.STRING()) > .field("r2", Types.STRING()) > .field("r5", Types.STRING()) > .field("r10", Types.STRING()) > .field("isBot", Types.BOOLEAN()) > .field("botcode", Types.STRING()) > .field("ts", Types.SQL_TIMESTAMP()) > .rowtime(new Rowtime() > .timestampsFromField("recvdTime") > .watermarksPeriodicBounded(10000) > ) > ) > .withFormat(new Json().deriveSchema()) > .inAppendMode() > .registerTableSource("sourceTopic"); > > String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, " > + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, " > + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM > sourceTopic " > + "WHERE r1='true' or r2='true' or r5='true' or r10='true' > and isBot='true' " > + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid, _zpsbd6"; > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is > showing the lateRecordsDropped, while executing the group by operation. > > Is there a way to get the sideOutput of this to be able to debug better ?? > > Thanks, > ~Ramya. >