afedulov commented on code in PR #21774:
URL: https://github.com/apache/flink/pull/21774#discussion_r1242162954


##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java:
##########
@@ -64,24 +67,19 @@ public static void main(String[] args) throws Exception {
         // We expect to detect session "b" and "c" at this point as well
         input.add(new Tuple3<>("c", 11L, 1));
 
-        DataStream<Tuple3<String, Long, Integer>> source =
-                env.addSource(
-                        new SourceFunction<Tuple3<String, Long, Integer>>() {
-                            private static final long serialVersionUID = 1L;
-
-                            @Override
-                            public void run(SourceContext<Tuple3<String, Long, 
Integer>> ctx)
-                                    throws Exception {
-                                for (Tuple3<String, Long, Integer> value : 
input) {
-                                    ctx.collectWithTimestamp(value, value.f1);

Review Comment:
   Good point, I think I accidentally missed that detail when I worked on this 
part of the PR a while back. I'll add a timestamp extractor. Doing this on the 
level of `DataGeneratorSource` is currently not possible - that would require 
some redesign around a low-level implementation with user-supplied SerDe, 
similar to how KafkaSource does it: 
https://github.com/apache/flink/blob/8db43d7ffeabc7a0f5c67c9cdd964c64f400789b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java#L60



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to