[ 
https://issues.apache.org/jira/browse/FLINK-7569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16153638#comment-16153638
 ] 

olivier sohn commented on FLINK-7569:
-------------------------------------

I'll close the issue as it is fixed in 1.3.2 (I was testing an old version)

> '1 record' delay for counted windowed streams
> ---------------------------------------------
>
>                 Key: FLINK-7569
>                 URL: https://issues.apache.org/jira/browse/FLINK-7569
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.2
>         Environment: osx
>            Reporter: olivier sohn
>            Priority: Minor
>
> In the example below (described at the end of the class documentation), we 
> see that Flink waits for the first element of the next window to be received 
> before "computing" a window. I guess this was necessary for windows based on 
> time, but here it's a window based on count, so I guess it should be possible 
> to specialize the behaviour so that as soon as the window has the right count 
> of elements, the processing is executed, instead of waiting for the first 
> element of the next window.
> Could this behaviour be implemented?
> {code:java}
> /**
>  * Example on how to read with a Kafka consumer and write the size-2 windowed 
> sum of records using a Kafka producer
>  *
>  *   For example, it transforms
>  *
>  *   1 1 2 1 3 2 1
>  *
>  *   into
>  *
>  *   2 3 5
>  *
>  * Note that the Kafka source and sink are expecting the following parameters 
> to be set
>  *  - "bootstrap.servers" (comma separated list of kafka brokers)
>  *  - "zookeeper.connect" (comma separated list of zookeeper servers)
>  *
>  * Note that the Kafka source is expecting the following parameters to be set
>  *  - "topicIn" the name of the topic to read data from.
>  *  - "group.id" the id of the consumer group
>  *
>  * Note that the Kafka sink is expecting the following parameters to be set:
>  *  - "topicOut" the name of the topic to read data to.
>  *
>  * You can pass these required parameters using
>  * "--bootstrap.servers host:port,host1:port1
>  *  --zookeeper.connect host:port
>  *  --topicIn testTopicIn
>  *  --topicOut testTopicOut"
>  *
>  * This is a valid input example:
>  *            --topicIn testIn
>  *            --topicOut testOut
>  *            --bootstrap.servers 172.22.12.3:49092
>  *            --zookeeper.connect 172.22.12.3:22181/dev
>  *            --group.id myGroup
>  *
>  *
>  * if LeaderNotAvailableException is raised, it means the topic doesn't exist
>  *
>  * It can be created using the command:
>  * > kafka-topics --create
>  *                --zookeeper 172.22.12.3:22181/dev
>  *                --partitions 2
>  *                --replication-factor 1
>  *                --topic testTopicOut
>  *
>  * To test that the example works:
>  * ------------------------------
>  *
>  * listen to results :
>  * > kafka-console-consumer --bootstrap-server 172.22.12.3:49092
>  *                          --whitelist testTopicOut
>  *
>  * run this class' main
>  *
>  * create inputs:
>  * > echo 1 | kafka-console-producer --broker-list 
> 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
>  *                                   --topic testTopicIn
>  * > echo 2 | kafka-console-producer --broker-list 
> 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
>  *                                   --topic testTopicIn
>  * > echo 3 | kafka-console-producer --broker-list 
> 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
>  *                                   --topic testTopicIn
>  * > echo 4 | kafka-console-producer --broker-list 
> 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
>  *                                   --topic testTopicIn
>  * > echo 5 | kafka-console-producer --broker-list 
> 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
>  *                                   --topic testTopicIn
>  *
>  * in the console that listens to the echo you should see :
>  * > 3
>  * > 7
>  *
>  * Note that the last input, '5', is necessary in this case. It seems Flink 
> waits for the first element
>  * of the next window to be received before further computing the window.
>  */
> public class ReadFromKafkaWriteWindowedSumIntoKafka {
>       public static void main(String[] args) throws Exception {
>               // create flink execution environment
>               StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>               // parse user parameters
>               ParameterTool parameterTool = ParameterTool.fromArgs(args);
>               // add a kafka source which reads from 'topicIn'
>               DataStream<String> messageStream = env.addSource(new 
> FlinkKafkaConsumer082<>(
>                               parameterTool.getRequired("topicIn"), new 
> SimpleStringSchema(), parameterTool.getProperties()));
>               messageStream
>                               // group stream elements 2 by 2
>                               .window(new Count(2))
>                               // sum the windowed stream
>                               .foldWindow(0, new FoldFunction<String, 
> Integer>() {
>                       @Override
>                       public Integer fold(Integer accumulator, String value) 
> throws Exception {
>                               return accumulator + Integer.parseInt(value);
>                       }
>               })
>                               // convert DiscretizedStream to DataStream
>                               .flatten()
>                               // convert stream data from Integer to String
>                               .map(new MapFunction<Integer, String>() {
>                       @Override
>                       public String map(Integer value) throws Exception {
>                               return Integer.toString(value);
>                       }
>               })
>                               // add a kafka sink which writes into 'topicOut'
>                               .addSink(new KafkaSink<>(
>                                               
> parameterTool.getRequired("bootstrap.servers"),
>                                               
> parameterTool.getRequired("topicOut"),
>                                               new 
> WriteIntoKafka.SimpleStringSchema()));
>               env.execute();
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to