afedulov commented on code in PR #21774:
URL: https://github.com/apache/flink/pull/21774#discussion_r1242618518


##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java:
##########
@@ -99,7 +104,20 @@ public static void main(String[] args) throws Exception {
                             .map(new ParseCarData())
                             .name("parse-input");
         } else {
-            carData = 
env.addSource(CarSource.create(2)).name("in-memory-source");
+            CarGeneratorFunction carGenerator = new CarGeneratorFunction(2);
+            DataGeneratorSource<Tuple4<Integer, Integer, Double, Long>> 
carGeneratorSource =
+                    new DataGeneratorSource<>(
+                            carGenerator,
+                            Long.MAX_VALUE,
+                            parallelismIgnored -> new GuavaRateLimiter(1),

Review Comment:
   To be honest it is hard to reason how the data is supposed to be generated 
by the original `CarSource` to begin with. It updates the distance as if one 
second had passed between the events, but it actually loops with 100ms delay, 
which is overestimating the distance by 10. Then there are these magic numbers 
of triggerMeters and eviction:
   
https://github.com/apache/flink/blob/af4e0506ce39e014d56a28d36cfc497bf0aa00f0/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java#L123-L124
   Since the new `CarSource` does not overestimate the distance I will reduce 
the delta window threshold and increase the events rate - this seems to produce 
the results at the same rate.
   
   
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to