Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1306#discussion_r44386994
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
 ---
    @@ -18,91 +18,34 @@
     
     package org.apache.flink.test.iterative;
     
    -import java.io.Serializable;
    -import java.util.Iterator;
    -
    -import org.apache.flink.api.common.Plan;
    -import org.apache.flink.api.java.record.functions.ReduceFunction;
    -import org.apache.flink.api.java.record.io.CsvOutputFormat;
    -import org.apache.flink.api.java.record.io.TextInputFormat;
    -import org.apache.flink.api.java.record.operators.BulkIteration;
    -import org.apache.flink.api.java.record.operators.FileDataSink;
    -import org.apache.flink.api.java.record.operators.FileDataSource;
    -import org.apache.flink.api.java.record.operators.ReduceOperator;
    -import org.apache.flink.test.util.RecordAPITestBase;
    -import org.apache.flink.types.Record;
    -import org.apache.flink.types.StringValue;
    -import org.apache.flink.util.Collector;
    -import org.junit.Assert;
    -
    -@SuppressWarnings("deprecation")
    -public class IterationWithAllReducerITCase extends RecordAPITestBase {
    -
    -   private static final String INPUT = "1\n" + "1\n" + "1\n" + "1\n" + 
"1\n" + "1\n" + "1\n" + "1\n";
    +import java.util.List;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.operators.IterativeDataSet;
    +import org.apache.flink.test.util.JavaProgramTestBase;
    +
    +public class IterationWithAllReducerITCase extends JavaProgramTestBase {
        private static final String EXPECTED = "1\n";
     
    -   protected String dataPath;
    -   protected String resultPath;
    -
    -   public IterationWithAllReducerITCase(){
    -           setTaskManagerNumSlots(4);
    -   }
    -
        @Override
    -   protected void preSubmit() throws Exception {
    -           dataPath = createTempFile("datapoints.txt", INPUT);
    -           resultPath = getTempFilePath("result");
    -   }
    -   
    -   @Override
    -   protected void postSubmit() throws Exception {
    -           compareResultsByLinesInMemory(EXPECTED, resultPath);
    -   }
    -
    -   @Override
    -   protected Plan getTestJob() {
    -           Plan plan = getTestPlanPlan(parallelism, dataPath, resultPath);
    -           return plan;
    -   }
    +   protected void testProgram() throws Exception {
    +           ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +           env.setParallelism(4);
     
    -   
    -   private static Plan getTestPlanPlan(int numSubTasks, String input, 
String output) {
    +           DataSet<String> initialInput = env.fromElements("1", "1", "1", 
"1", "1", "1", "1", "1");
     
    -           FileDataSource initialInput = new 
FileDataSource(TextInputFormat.class, input, "input");
    -           
    -           BulkIteration iteration = new BulkIteration("Loop");
    -           iteration.setInput(initialInput);
    -           iteration.setMaximumNumberOfIterations(5);
    -           
    -           Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
    +           IterativeDataSet<String> iteration = 
initialInput.iterate(5).name("Loop");
     
    -           ReduceOperator sumReduce = ReduceOperator.builder(new 
PickOneReducer())
    -                           .input(iteration.getPartialSolution())
    -                           .name("Compute sum (Reduce)")
    -                           .build();
    -           
    -           iteration.setNextPartialSolution(sumReduce);
    +           DataSet<String> sumReduce = iteration.reduce(new 
ReduceFunction<String>(){
    --- End diff --
    
    A `GroupReduceFunction` would be closer to the original test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to