Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205110205 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java --- @@ -66,14 +61,21 @@ public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); int loadFactor = Integer.parseInt(params.getRequired("loadFactor")); String outputPath = params.getRequired("outputPath"); + boolean infinite = params.getBoolean("infinite", false); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); int numKeys = loadFactor * 128 * 1024; - DataSet<Tuple2<String, Integer>> x1Keys = env.createInput(new Generator(numKeys, 1)).setParallelism(4); + DataSet<Tuple2<String, Integer>> x1Keys; DataSet<Tuple2<String, Integer>> x2Keys = env.createInput(new Generator(numKeys * 32, 2)).setParallelism(4); DataSet<Tuple2<String, Integer>> x8Keys = env.createInput(new Generator(numKeys, 8)).setParallelism(4); + if (infinite) { + x1Keys = env.createInput(Generator.infinite()).setParallelism(4).filter(t -> t.f1 >= 0); --- End diff -- Yes, I didn't want to pass elements from infinite source down the graph, so not to reach some OOMs because of the infinite nature of it. That's why I am filtering out those elements. Do you think we should pass them anyway?
---