kevinshin created FLINK-30546:
---------------------------------

             Summary: WindowDeduplicate didn't write result to sink timely
                 Key: FLINK-30546
                 URL: https://issues.apache.org/jira/browse/FLINK-30546
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.15.3
         Environment: flink 1.15.3
            Reporter: kevinshin


StreamExecutionEnvironment checkpoint Trigger every 15s.

Datasource is kafka,build by : 

*KafkaSource<MyKafkaRecord> source = KafkaSource.<MyKafkaRecord>builder()*
 *.setBootstrapServers(kafkaServers)*
 *.setTopicPattern(topic)*
 *.setGroupId(consumerGroupId)*
 *.setStartingOffsets(OffsetsInitializer.latest())*
 *.setDeserializer(KafkaRecordDeserializationSchema.of(new 
MyKafkaDeserialization(true, true)))*
 *.build();* 

which will include ConsumerRecord.timestamp and set to MyKafkaRecord's 
timestamp field.

and build a DataStream by : 

*DataStream<MyKafkaRecord> stream2 = env.fromSource(source, 
WatermarkStrategy.<MyKafkaRecord>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event,
 recordTimestamp)-> event.getTimestamp()), "Kafka Source");*

MyKafkaRecord contain the raw value of kafka record,which is a json string. ** 
then I will parse *MyKafkaRecord* to a POJO named *APlog ,* Aplog have a filed 
named eventTime  to hold MyKafkaRecord's timestamp field *:*

*DataStream<APlog> aplogStream = stream2.map(record->\{.......})*

*aplogStream.assignTimestampsAndWatermarks(WatermarkStrategy.<APlog>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event,
 recordTimestamp)-> event.getEventTime()));*

Then convert  the aplogStream to table aplog : 

*Table aplog = tEnv.fromDataStream(aplogStream, $("userName"), $("userMAC"), 
$("bssid"), $("ssid"), $("apName"), $("radioID"), $("vlanid"), $("action"), 
$("ddate"), $("dtime"), $("rawValue"), $("region"), $("eventTime"), 
$("eventTime_2").rowtime());*

Then regist as a TemporaryView : 

*tEnv.createTemporaryView("aplog", aplog);*

Try to WindowDeduplicate on aplog : 

*TableResult result = tEnv.executeSql("select * from " +*
 *"(select *, ROW_NUMBER() OVER (" +*
 *"PARTITION BY window_start, window_end, userName ORDER BY eventTime_2 DESC" +*
 *") as row_num from " +*
 *"TABLE(TUMBLE(TABLE aplog, DESCRIPTOR(eventTime_2), INTERVAL '10' SECONDS))" 
+*
 *") where row_num <= 1");*

*result.print();*

result didn't print out to console every time after the checkpoint completed, I 
don't know when will the result to be print out. 

but when I try : 

*TableResult result = tEnv.executeSql("select * from TABLE(TUMBLE(TABLE aplog, 
DESCRIPTOR(eventTime_2), INTERVAL '10' SECONDS))");*

*result.print();*

I can see the result out to console every time after the checkpoint completed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to