afedulov commented on code in PR #21774:
URL: https://github.com/apache/flink/pull/21774#discussion_r1242162954
##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java:
##########
@@ -64,24 +67,19 @@ public static void main(String[] args) throws Exception {
// We expect to detect session "b" and "c" at this point as well
input.add(new Tuple3<>("c", 11L, 1));
- DataStream<Tuple3<String, Long, Integer>> source =
- env.addSource(
- new SourceFunction<Tuple3<String, Long, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple3<String, Long,
Integer>> ctx)
- throws Exception {
- for (Tuple3<String, Long, Integer> value :
input) {
- ctx.collectWithTimestamp(value, value.f1);
Review Comment:
Good point, I think I accidentally missed that detail when I worked on this
part of the PR a while back. I'll add a timestamp extractor. Doing this on the
level of GeneratorFunction is not possible - that would require some redesign
around a low-level implementation with user-supplied SerDe, similar to how
KafkaSource does it:
https://github.com/apache/flink/blob/8db43d7ffeabc7a0f5c67c9cdd964c64f400789b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java#L60
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]