[
https://issues.apache.org/jira/browse/FLINK-7569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16153638#comment-16153638
]
olivier sohn commented on FLINK-7569:
-------------------------------------
I'll close the issue as it is fixed in 1.3.2 (I was testing an old version)
> '1 record' delay for counted windowed streams
> ---------------------------------------------
>
> Key: FLINK-7569
> URL: https://issues.apache.org/jira/browse/FLINK-7569
> Project: Flink
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.3.2
> Environment: osx
> Reporter: olivier sohn
> Priority: Minor
>
> In the example below (described at the end of the class documentation), we
> see that Flink waits for the first element of the next window to be received
> before "computing" a window. I guess this was necessary for windows based on
> time, but here it's a window based on count, so I guess it should be possible
> to specialize the behaviour so that as soon as the window has the right count
> of elements, the processing is executed, instead of waiting for the first
> element of the next window.
> Could this behaviour be implemented?
> {code:java}
> /**
> * Example on how to read with a Kafka consumer and write the size-2 windowed
> sum of records using a Kafka producer
> *
> * For example, it transforms
> *
> * 1 1 2 1 3 2 1
> *
> * into
> *
> * 2 3 5
> *
> * Note that the Kafka source and sink are expecting the following parameters
> to be set
> * - "bootstrap.servers" (comma separated list of kafka brokers)
> * - "zookeeper.connect" (comma separated list of zookeeper servers)
> *
> * Note that the Kafka source is expecting the following parameters to be set
> * - "topicIn" the name of the topic to read data from.
> * - "group.id" the id of the consumer group
> *
> * Note that the Kafka sink is expecting the following parameters to be set:
> * - "topicOut" the name of the topic to read data to.
> *
> * You can pass these required parameters using
> * "--bootstrap.servers host:port,host1:port1
> * --zookeeper.connect host:port
> * --topicIn testTopicIn
> * --topicOut testTopicOut"
> *
> * This is a valid input example:
> * --topicIn testIn
> * --topicOut testOut
> * --bootstrap.servers 172.22.12.3:49092
> * --zookeeper.connect 172.22.12.3:22181/dev
> * --group.id myGroup
> *
> *
> * if LeaderNotAvailableException is raised, it means the topic doesn't exist
> *
> * It can be created using the command:
> * > kafka-topics --create
> * --zookeeper 172.22.12.3:22181/dev
> * --partitions 2
> * --replication-factor 1
> * --topic testTopicOut
> *
> * To test that the example works:
> * ------------------------------
> *
> * listen to results :
> * > kafka-console-consumer --bootstrap-server 172.22.12.3:49092
> * --whitelist testTopicOut
> *
> * run this class' main
> *
> * create inputs:
> * > echo 1 | kafka-console-producer --broker-list
> 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
> * --topic testTopicIn
> * > echo 2 | kafka-console-producer --broker-list
> 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
> * --topic testTopicIn
> * > echo 3 | kafka-console-producer --broker-list
> 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
> * --topic testTopicIn
> * > echo 4 | kafka-console-producer --broker-list
> 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
> * --topic testTopicIn
> * > echo 5 | kafka-console-producer --broker-list
> 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
> * --topic testTopicIn
> *
> * in the console that listens to the echo you should see :
> * > 3
> * > 7
> *
> * Note that the last input, '5', is necessary in this case. It seems Flink
> waits for the first element
> * of the next window to be received before further computing the window.
> */
> public class ReadFromKafkaWriteWindowedSumIntoKafka {
> public static void main(String[] args) throws Exception {
> // create flink execution environment
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> // parse user parameters
> ParameterTool parameterTool = ParameterTool.fromArgs(args);
> // add a kafka source which reads from 'topicIn'
> DataStream<String> messageStream = env.addSource(new
> FlinkKafkaConsumer082<>(
> parameterTool.getRequired("topicIn"), new
> SimpleStringSchema(), parameterTool.getProperties()));
> messageStream
> // group stream elements 2 by 2
> .window(new Count(2))
> // sum the windowed stream
> .foldWindow(0, new FoldFunction<String,
> Integer>() {
> @Override
> public Integer fold(Integer accumulator, String value)
> throws Exception {
> return accumulator + Integer.parseInt(value);
> }
> })
> // convert DiscretizedStream to DataStream
> .flatten()
> // convert stream data from Integer to String
> .map(new MapFunction<Integer, String>() {
> @Override
> public String map(Integer value) throws Exception {
> return Integer.toString(value);
> }
> })
> // add a kafka sink which writes into 'topicOut'
> .addSink(new KafkaSink<>(
>
> parameterTool.getRequired("bootstrap.servers"),
>
> parameterTool.getRequired("topicOut"),
> new
> WriteIntoKafka.SimpleStringSchema()));
> env.execute();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)