[
https://issues.apache.org/jira/browse/FLINK-2901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14998320#comment-14998320
]
ASF GitHub Bot commented on FLINK-2901:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1306#discussion_r44386994
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/iterative/IterationWithAllReducerITCase.java
---
@@ -18,91 +18,34 @@
package org.apache.flink.test.iterative;
-import java.io.Serializable;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.ReduceFunction;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.TextInputFormat;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-
-@SuppressWarnings("deprecation")
-public class IterationWithAllReducerITCase extends RecordAPITestBase {
-
- private static final String INPUT = "1\n" + "1\n" + "1\n" + "1\n" +
"1\n" + "1\n" + "1\n" + "1\n";
+import java.util.List;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+public class IterationWithAllReducerITCase extends JavaProgramTestBase {
private static final String EXPECTED = "1\n";
- protected String dataPath;
- protected String resultPath;
-
- public IterationWithAllReducerITCase(){
- setTaskManagerNumSlots(4);
- }
-
@Override
- protected void preSubmit() throws Exception {
- dataPath = createTempFile("datapoints.txt", INPUT);
- resultPath = getTempFilePath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(EXPECTED, resultPath);
- }
-
- @Override
- protected Plan getTestJob() {
- Plan plan = getTestPlanPlan(parallelism, dataPath, resultPath);
- return plan;
- }
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(4);
-
- private static Plan getTestPlanPlan(int numSubTasks, String input,
String output) {
+ DataSet<String> initialInput = env.fromElements("1", "1", "1",
"1", "1", "1", "1", "1");
- FileDataSource initialInput = new
FileDataSource(TextInputFormat.class, input, "input");
-
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(initialInput);
- iteration.setMaximumNumberOfIterations(5);
-
- Assert.assertTrue(iteration.getMaximumNumberOfIterations() > 1);
+ IterativeDataSet<String> iteration =
initialInput.iterate(5).name("Loop");
- ReduceOperator sumReduce = ReduceOperator.builder(new
PickOneReducer())
- .input(iteration.getPartialSolution())
- .name("Compute sum (Reduce)")
- .build();
-
- iteration.setNextPartialSolution(sumReduce);
+ DataSet<String> sumReduce = iteration.reduce(new
ReduceFunction<String>(){
--- End diff --
A `GroupReduceFunction` would be closer to the original test.
> Several flink-test ITCases depend on Record API features
> --------------------------------------------------------
>
> Key: FLINK-2901
> URL: https://issues.apache.org/jira/browse/FLINK-2901
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Affects Versions: 0.10
> Reporter: Fabian Hueske
> Assignee: Chesnay Schepler
>
> There are several ITCases and utility classes in {{flink-tests}} that depend
> on the Record API including:
> - ITCases for Record API operators in
> {{flink-tests/src/test/java/org/apache/flink/test/operators}}
> - ITCases for Record API programs in
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobTests}}
> - Record API programs in
> {{flink-tests/src/test/java/org/apache/flink/test/recordJobs}}
> - Several ITCases for iterations in
> {{flink-tests/src/test/java/org/apache/flink/test/iterative}}
> - Tests for job canceling in
> {{flink-tests/src/test/java/org/apache/flink/test/cancelling}}
> - Test for failing jobs in
> {{flink-tests/src/test/java/org/apache/flink/test/failingPrograms/TaskFailureITCase}}
> - Optimizer tests in
> {{flink-tests/src/test/java/org/apache/flink/test/optimizer}}
> - Accumulator test in
> {{flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorIterativeITCase}}
> - Broadcast test in
> {{flink-tests/src/test/java/org/apache/flink/test/broadcastvasr/BroadcastBranchingITCase}}
> - distributed cache test in
> {{flink-tests/src/test/java/org/apache/flink/test/distributedCache/DistributedCacheTest}}
> and probably a few more.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)