[
https://issues.apache.org/jira/browse/FLINK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528540#comment-17528540
]
zhangbin commented on FLINK-27418:
----------------------------------
Thank you for your reply, the useBlinkPlanner method still exists in Flink 1.14
and the result is sometimes wrong in Flink 1.14. You will get different results
if you execute it several times
> Flink SQL TopN result is wrong
> ------------------------------
>
> Key: FLINK-27418
> URL: https://issues.apache.org/jira/browse/FLINK-27418
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.12.2, 1.14.3
> Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes
> wrong
> Reporter: zhangbin
> Priority: Major
>
> Flink SQL TopN is executed multiple times with different results, sometimes
> with correct results and sometimes with incorrect results.
> Example:
> {code:java}
> @Test
> public void flinkSqlJoinRetract() {
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> streamEnv.setParallelism(1);
> StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv, settings);
> tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000));
> RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
> RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
> SourceFunction<Row> waybillSourceFunction =
> buildWaybillStreamSource(waybillTableTypeInfo);
> SourceFunction<Row> itemSourceFunction =
> buildItemStreamSource(itemTableTypeInfo);
> String waybillTable = "waybill";
> String itemTable = "item";
> DataStreamSource<Row> waybillStream = streamEnv.addSource(
> waybillSourceFunction,
> waybillTable,
> waybillTableTypeInfo);
> DataStreamSource<Row> itemStream = streamEnv.addSource(
> itemSourceFunction,
> itemTable,
> itemTableTypeInfo);
> Expression[] waybillFields = ExpressionParser
> .parseExpressionList(String.join(",",
> waybillTableTypeInfo.getFieldNames())
> + ",proctime.proctime").toArray(new Expression[0]);
> Expression[] itemFields = ExpressionParser
> .parseExpressionList(
> String.join(",", itemTableTypeInfo.getFieldNames()) +
> ",proctime.proctime")
> .toArray(new Expression[0]);
> tableEnv.createTemporaryView(waybillTable, waybillStream,
> waybillFields);
> tableEnv.createTemporaryView(itemTable, itemStream, itemFields);
> String sql =
> "select \n"
> + " city_id, \n"
> + " count(*) as cnt\n"
> + "from (\n"
> + " select id,city_id\n"
> + " from (\n"
> + " select \n"
> + " id,\n"
> + " city_id,\n"
> + " row_number() over(partition by id order by
> utime desc ) as rno \n"
> + " from (\n"
> + " select \n"
> + " waybill.id as id,\n"
> + " coalesce(item.city_id, waybill.city_id) as
> city_id,\n"
> + " waybill.utime as utime \n"
> + " from waybill left join item \n"
> + " on waybill.id = item.id \n"
> + " ) \n"
> + " )\n"
> + " where rno =1\n"
> + ")\n"
> + "group by city_id";
> StatementSet statementSet = tableEnv.createStatementSet();
> Table table = tableEnv.sqlQuery(sql);
> DataStream<Tuple2<Boolean, Row>> rowDataStream =
> tableEnv.toRetractStream(table, Row.class);
> rowDataStream.printToErr();
> try {
> streamEnv.execute();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> private static RowTypeInfo buildWaybillTableTypeInfo() {
> TypeInformation[] types = new TypeInformation[]{Types.INT(),
> Types.STRING(), Types.LONG(), Types.LONG()};
> String[] fields = new String[]{"id", "city_id", "rider_id", "utime"};
> return new RowTypeInfo(types, fields);
> }
> private static RowTypeInfo buildItemTableTypeInfo() {
> TypeInformation[] types = new TypeInformation[]{Types.INT(),
> Types.STRING(), Types.LONG()};
> String[] fields = new String[]{"id", "city_id", "utime"};
> return new RowTypeInfo(types, fields);
> }
> //id,rider_id,city_id,utime
> private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo
> rowTypeInfo) {
> return new SourceFunction<Row>() {
> private volatile boolean stopped = false;
> int count = 0;
> int[] ids = {111, 222, 333, 111};
> String[] cityIds = {"A", "A", "B", "A"};
> @Override
> public void run(SourceContext<Row> ctx) throws Exception {
> while (!stopped) {
> int id = ids[count % ids.length];
> String cityId = cityIds[count % cityIds.length];
> Row row = new Row(4);
> row.setField(0, id);
> row.setField(1, cityId);
> row.setField(2, (long) RandomUtils.nextInt(1000, 2000));
> row.setField(3, System.currentTimeMillis());
> printRow(rowTypeInfo, row);
> ctx.collect(row);
> if (++count > 3) {
> stopped = true;
> }
> }
> }
> @Override
> public void cancel() {
> stopped = true;
> }
> };
> }
> //id,city_id,utime
> private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo
> rowTypeInfo) {
> return new SourceFunction<Row>() {
> private volatile boolean stopped = false;
> int count = 0;
> int[] ids = {111, 333};
> String[] cityIds = {"C", "D"};
> @Override
> public void run(SourceContext<Row> ctx) throws Exception {
> while (!stopped) {
> Thread.sleep(RandomUtils.nextInt(1000, 2000));
> int id = ids[count % ids.length];
> String cityId = cityIds[count % cityIds.length];
> Row row = new Row(3);
> row.setField(0, id);
> row.setField(1, cityId);
> //row.setField(2, System.currentTimeMillis());
> printRow(rowTypeInfo, row);
> ctx.collect(row);
> if (++count >= 2) {
> stopped = true;
> }
> }
> }
> @Override
> public void cancel() {
> stopped = true;
> }
> };
> }
> public static void printRow(RowTypeInfo rowTypeInfo, Row row) {
> String prefix = "";
> for (int i = 0; i < rowTypeInfo.getArity(); ++i) {
> prefix = i > 0 ? "," : "";
> System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" +
> row.getField(i));
> }
> System.out.println();
> }
> {code}
> ------------------------------------------------------------
> |*wrong result*||right result||
> |id:111,city_id:A,rider_id:1137,utime:1650979957702
> id:222,city_id:A,rider_id:1976,utime:1650979957725
> id:333,city_id:B,rider_id:1916,utime:1650979957725
> id:111,city_id:A,rider_id:1345,utime:1650979957725
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (true,B,1)
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> id:111,city_id:C,utime:null
> (false,A,2)
> (true,A,1)
> (true,C,1)
> (false,A,1)
> (false,C,1)
> (true,C,2)
> id:333,city_id: D,utime:null
> (false,B,1)
> (true,D,1)
> The final result:
> C,2
> D,1
> is wrong.|
> id:111,city_id:A,rider_id:1155,utime:1650980662019
> id:222,city_id:A,rider_id:1875,utime:1650980662042
> id:333,city_id:B,rider_id:1430,utime:1650980662042
> id:111,city_id:A,rider_id:1308,utime:1650980662042
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (true,B,1)
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> id:111,city_id:C,utime:null
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (false,A,2)
> (true,A,1)
> (true,C,1)
> id:333,city_id: D,utime:null
> (false,B,1)
> (true,D,1)
> The final result:
> A,1
> C,2
> D,1
> is right.|
>
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)