Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6415#discussion_r205110205
  
    --- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
 ---
    @@ -66,14 +61,21 @@ public static void main(String[] args) throws Exception 
{
                ParameterTool params = ParameterTool.fromArgs(args);
                int loadFactor = 
Integer.parseInt(params.getRequired("loadFactor"));
                String outputPath = params.getRequired("outputPath");
    +           boolean infinite = params.getBoolean("infinite", false);
     
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
     
                int numKeys = loadFactor * 128 * 1024;
    -           DataSet<Tuple2<String, Integer>> x1Keys = env.createInput(new 
Generator(numKeys, 1)).setParallelism(4);
    +           DataSet<Tuple2<String, Integer>> x1Keys;
                DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(new 
Generator(numKeys * 32, 2)).setParallelism(4);
                DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(new 
Generator(numKeys, 8)).setParallelism(4);
     
    +           if (infinite) {
    +                   x1Keys = 
env.createInput(Generator.infinite()).setParallelism(4).filter(t -> t.f1 >= 0);
    --- End diff --
    
    Yes, I didn't want to pass elements from infinite source down the graph, so 
not to reach some OOMs because of the infinite nature of it. That's why I am 
filtering out those elements.
    
    Do you think we should pass them anyway?


---

Reply via email to