[ 
https://issues.apache.org/jira/browse/FLINK-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14998320#comment-14998320
 ] 

ASF GitHub Bot commented on FLINK-2901:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1306#discussion_r44386994
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
 ---
    @@ -18,91 +18,34 @@
     
     package org.apache.flink.test.iterative;
     
    -import java.io.Serializable;
    -import java.util.Iterator;
    -
    -import org.apache.flink.api.common.Plan;
    -import org.apache.flink.api.java.record.functions.ReduceFunction;
    -import org.apache.flink.api.java.record.io.CsvOutputFormat;
    -import org.apache.flink.api.java.record.io.TextInputFormat;
    -import org.apache.flink.api.java.record.operators.BulkIteration;
    -import org.apache.flink.api.java.record.operators.FileDataSink;
    -import org.apache.flink.api.java.record.operators.FileDataSource;
    -import org.apache.flink.api.java.record.operators.ReduceOperator;
    -import org.apache.flink.test.util.RecordAPITestBase;
    -import org.apache.flink.types.Record;
    -import org.apache.flink.types.StringValue;
    -import org.apache.flink.util.Collector;
    -import org.junit.Assert;
    -
    -@SuppressWarnings("deprecation")
    -public class IterationWithAllReducerITCase extends RecordAPITestBase {
    -
    -   private static final String INPUT = "1\n" + "1\n" + "1\n" + "1\n" + 
"1\n" + "1\n" + "1\n" + "1\n";
    +import java.util.List;
    +import org.apache.flink.api.common.functions.ReduceFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.operators.IterativeDataSet;
    +import org.apache.flink.test.util.JavaProgramTestBase;
    +
    +public class IterationWithAllReducerITCase extends JavaProgramTestBase {
        private static final String EXPECTED = "1\n";
     
    -   protected String dataPath;
    -   protected String resultPath;
    -
    -   public IterationWithAllReducerITCase(){
    -           setTaskManagerNumSlots(4);
    -   }
    -
        @Override
    -   protected void preSubmit() throws Exception {
    -           dataPath = createTempFile("datapoints.txt", INPUT);
    -           resultPath = getTempFilePath("result");
    -   }
    -   
    -   @Override
    -   protected void postSubmit() throws Exception {
    -           compareResultsByLinesInMemory(EXPECTED, resultPath);
    -   }
    -
    -   @Override
    -   protected Plan getTestJob() {
    -           Plan plan = getTestPlanPlan(parallelism, dataPath, resultPath);
    -           return plan;
    -   }
    +   protected void testProgram() throws Exception {
    +           ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +           env.setParallelism(4);
     
    -   
    -   private static Plan getTestPlanPlan(int numSubTasks, String input, 
String output) {
    +           DataSet<String> initialInput = env.fromElements("1", "1", "1", 
"1", "1", "1", "1", "1");
     
    -           FileDataSource initialInput = new 
FileDataSource(TextInputFormat.class, input, "input");
    -           
    -           BulkIteration iteration = new BulkIteration("Loop");
    -           iteration.setInput(initialInput);
    -           iteration.setMaximumNumberOfIterations(5);
    -           
    -           Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
    +           IterativeDataSet<String> iteration = 
initialInput.iterate(5).name("Loop");
     
    -           ReduceOperator sumReduce = ReduceOperator.builder(new 
PickOneReducer())
    -                           .input(iteration.getPartialSolution())
    -                           .name("Compute sum (Reduce)")
    -                           .build();
    -           
    -           iteration.setNextPartialSolution(sumReduce);
    +           DataSet<String> sumReduce = iteration.reduce(new 
ReduceFunction<String>(){
    --- End diff --
    
    A `GroupReduceFunction` would be closer to the original test.


> Several flink-test ITCases depend on Record API features
> --------------------------------------------------------
>
>                 Key: FLINK-2901
>                 URL: https://issues.apache.org/jira/browse/FLINK-2901
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Tests
>    Affects Versions: 0.10
>            Reporter: Fabian Hueske
>            Assignee: Chesnay Schepler
>
> There are several ITCases and utility classes in {{flink-tests}} that depend 
> on the Record API including:
> - ITCases for Record API operators in 
> {{flink-tests/src/test/java/org/apache/flink/test/operators}}
> - ITCases for Record API programs in 
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobTests}}
> - Record API programs in 
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobs}}
> - Several ITCases for iterations in 
> {{flink-tests/src/test/java/org/apache/flink/test/iterative}}
> - Tests for job canceling in 
> {{flink-tests/src/test/java/org/apache/flink/test/cancelling}}
> - Test for failing jobs in 
> {{flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase}}
> - Optimizer tests in 
> {{flink-tests/src/test/java/org/apache/flink/test/optimizer}}
> - Accumulator test in 
> {{flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase}}
> - Broadcast test in 
> {{flink-tests/src/test/java/org/apache/flink/test/broadcastvasr/BroadcastBranchingITCase}}
> - distributed cache test in 
> {{flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest}}
> and probably a few more.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to