The two files you committed here "PipelineBreakerTest.java" and
"SelectOneReducer.java" both contain the old license header. I'll try and
see if I can make the license checker more strict.


On Sat, Jul 12, 2014 at 7:32 PM, <[email protected]> wrote:

> [FLINK-1018] Add tests to verify correct placement of pipeline breakers
> with broadcast variables
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ec0b00d6
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ec0b00d6
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ec0b00d6
>
> Branch: refs/heads/master
> Commit: ec0b00d613a4400baf53f5de2361a2271a26ae63
> Parents: a822486
> Author: Stephan Ewen <[email protected]>
> Authored: Fri Jul 11 18:02:52 2014 +0200
> Committer: Stephan Ewen <[email protected]>
> Committed: Sat Jul 12 19:31:26 2014 +0200
>
> ----------------------------------------------------------------------
>  .../compiler/BranchingPlansCompilerTest.java    |  10 +-
>  .../flink/compiler/PipelineBreakerTest.java     | 137 +++++++++++++++++++
>  .../testfunctions/SelectOneReducer.java         |  28 ++++
>  3 files changed, 170 insertions(+), 5 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> index 31dadae..571f4e4 100644
> ---
> a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> @@ -359,6 +359,7 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                 }
>         }
>
> +       @SuppressWarnings({ "unchecked", "deprecation" })
>         @Test
>         public void testBranchEachContractType() {
>                 try {
> @@ -374,7 +375,6 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                                 .name("Reduce 1")
>                                 .build();
>
> -                       @SuppressWarnings("unchecked")
>                         JoinOperator match1 = JoinOperator.builder(new
> DummyMatchStub(), IntValue.class, 0, 0)
>                                 .input1(sourceB, sourceB, sourceC)
>                                 .input2(sourceC)
> @@ -434,10 +434,10 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                                 .build();
>
>                         FileDataSink sink = new FileDataSink(new
> DummyOutputFormat(), OUT_FILE, cogroup7);
> -       //              sink.addInput(sourceA);
> -       //              sink.addInput(co3);
> -       //              sink.addInput(co4);
> -       //              sink.addInput(co1);
> +                       sink.addInput(sourceA);
> +                       sink.addInput(cogroup3);
> +                       sink.addInput(cogroup4);
> +                       sink.addInput(cogroup1);
>
>                         // return the PACT plan
>                         Plan plan = new Plan(sink, "Branching of each
> contract type");
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> new file mode 100644
> index 0000000..4e43a74
> --- /dev/null
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> @@ -0,0 +1,137 @@
>
> +/***********************************************************************************************************************
> + *
> + * Copyright (C) 2010-2013 by the Stratosphere project (
> http://stratosphere.eu)
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License"); you
> may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> distributed under the License is distributed on
> + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
> express or implied. See the License for the
> + * specific language governing permissions and limitations under the
> License.
> + *
> +
> **********************************************************************************************************************/
> +
> +package org.apache.flink.compiler;
> +
> +import static org.junit.Assert.*;
> +
> +import org.junit.Test;
> +import org.apache.flink.api.common.Plan;
> +import org.apache.flink.api.java.DataSet;
> +import org.apache.flink.api.java.ExecutionEnvironment;
> +import org.apache.flink.api.java.IterativeDataSet;
> +import org.apache.flink.compiler.plan.BulkIterationPlanNode;
> +import org.apache.flink.compiler.plan.OptimizedPlan;
> +import org.apache.flink.compiler.plan.SingleInputPlanNode;
> +import org.apache.flink.compiler.plan.SinkPlanNode;
> +import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
> +import org.apache.flink.compiler.testfunctions.IdentityMapper;
> +import org.apache.flink.compiler.testfunctions.SelectOneReducer;
> +
> +@SuppressWarnings("serial")
> +public class PipelineBreakerTest extends CompilerTestBase {
> +
> +       @Test
> +       public void testPipelineBreakerWithBroadcastVariable() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +                       DataSet<Long> source = env.generateSequence(1,
> 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> result = source.map(new
> IdentityMapper<Long>())
> +
>       .map(new IdentityMapper<Long>())
> +
>               .withBroadcastSet(source, "bc");
> +
> +                       result.print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> sink.getInput().getSource();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +
> +       @Test
> +       public void testPipelineBreakerBroadcastedAllReduce() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +                       DataSet<Long> sourceWithMapper =
> env.generateSequence(1, 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> bcInput1 = sourceWithMapper
> +
>       .map(new IdentityMapper<Long>())
> +
>       .reduce(new SelectOneReducer<Long>());
> +                       DataSet<Long> bcInput2 = env.generateSequence(1,
> 10);
> +
> +                       DataSet<Long> result = sourceWithMapper
> +                                       .map(new IdentityMapper<Long>())
> +
> .withBroadcastSet(bcInput1, "bc1")
> +
> .withBroadcastSet(bcInput2, "bc2");
> +
> +                       result.print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> sink.getInput().getSource();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +
> +       @Test
> +       public void testPipelineBreakerBroadcastedPartialSolution() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +
> +                       DataSet<Long> initialSource =
> env.generateSequence(1, 10);
> +                       IterativeDataSet<Long> iteration =
> initialSource.iterate(100);
> +
> +
> +                       DataSet<Long> sourceWithMapper =
> env.generateSequence(1, 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> bcInput1 = sourceWithMapper
> +
>       .map(new IdentityMapper<Long>())
> +
>       .reduce(new SelectOneReducer<Long>());
> +
> +                       DataSet<Long> result = sourceWithMapper
> +                                       .map(new IdentityMapper<Long>())
> +
> .withBroadcastSet(iteration, "bc2")
> +
> .withBroadcastSet(bcInput1, "bc1");
> +
> +
> +                       iteration.closeWith(result).print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       BulkIterationPlanNode iterationPlanNode =
> (BulkIterationPlanNode) sink.getInput().getSource();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> iterationPlanNode.getRootOfStepFunction();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> new file mode 100644
> index 0000000..492b9f8
> --- /dev/null
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> @@ -0,0 +1,28 @@
>
> +/***********************************************************************************************************************
> + *
> + * Copyright (C) 2010-2013 by the Stratosphere project (
> http://stratosphere.eu)
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License"); you
> may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> distributed under the License is distributed on
> + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
> express or implied. See the License for the
> + * specific language governing permissions and limitations under the
> License.
> + *
> +
> **********************************************************************************************************************/
> +
> +package org.apache.flink.compiler.testfunctions;
> +
> +import org.apache.flink.api.java.functions.ReduceFunction;
> +
> +public class SelectOneReducer<T> extends ReduceFunction<T> {
> +
> +       private static final long serialVersionUID = 1L;
> +
> +       @Override
> +       public T reduce(T value1, T value2) throws Exception {
> +               return value1;
> +       }
> +}
>
>

Reply via email to