Repository: flink Updated Branches: refs/heads/master 49f5a0179 -> 25ef3240c
[FLINK-2318] Union can be used as BroadcastVariable This closes #1390 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/25ef3240 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/25ef3240 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/25ef3240 Branch: refs/heads/master Commit: 25ef3240c504f8e2989af0dcac4e16f471510f39 Parents: 49f5a01 Author: zentol <ches...@apache.org> Authored: Thu Nov 19 13:20:18 2015 +0100 Committer: zentol <ches...@apache.org> Committed: Tue Nov 24 22:24:14 2015 +0100 ---------------------------------------------------------------------- .../plantranslate/JobGraphGenerator.java | 7 +- .../runtime/operators/util/TaskConfig.java | 4 +- .../broadcastvars/BroadcastUnionITCase.java | 74 ++++++++++++++++++++ 3 files changed, 82 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/25ef3240/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index c9140a5..1546631 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -620,10 +620,13 @@ public class JobGraphGenerator implements Visitor<PlanNode> { // the inputs of the union as well, because the optimizer has a separate union // node, which does not exist in the JobGraph. Otherwise, this can result in // deadlocks when closing a branching flow at runtime. - if (input.getDataExchangeMode().equals(DataExchangeMode.BATCH)) { - for (Channel in : inputPlanNode.getInputs()) { + for (Channel in : inputPlanNode.getInputs()) { + if (input.getDataExchangeMode().equals(DataExchangeMode.BATCH)) { in.setDataExchangeMode(DataExchangeMode.BATCH); } + if (isBroadcast) { + in.setShipStrategy(ShipStrategyType.BROADCAST, in.getDataExchangeMode()); + } } } else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) { http://git-wip-us.apache.org/repos/asf/flink/blob/25ef3240/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java index 0254c8c..c32c43b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java @@ -462,8 +462,10 @@ public class TaskConfig implements Serializable { public void addBroadcastInputToGroup(int groupIndex) { final String grp = BROADCAST_INPUT_GROUP_SIZE_PREFIX + groupIndex; + if (!this.config.containsKey(grp)) { + this.config.setInteger(NUM_BROADCAST_INPUTS, this.config.getInteger(NUM_BROADCAST_INPUTS, 0) + 1); + } this.config.setInteger(grp, this.config.getInteger(grp, 0) + 1); - this.config.setInteger(NUM_BROADCAST_INPUTS, this.config.getInteger(NUM_BROADCAST_INPUTS, 0) + 1); } public void setInputAsynchronouslyMaterialized(int inputNum, boolean temp) { http://git-wip-us.apache.org/repos/asf/flink/blob/25ef3240/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java new file mode 100644 index 0000000..080a3de --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/broadcastvars/BroadcastUnionITCase.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.broadcastvars; + +import java.util.List; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.test.util.JavaProgramTestBase; +import org.junit.Assert; + +public class BroadcastUnionITCase extends JavaProgramTestBase { + private static final String BC_NAME = "bc"; + + @Override + protected void testProgram() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + DataSet<Long> input = env.generateSequence(1, 10); + DataSet<Long> bc1 = env.generateSequence(1, 5); + DataSet<Long> bc2 = env.generateSequence(6, 10); + + List<Long> result = input + .map(new Mapper()) + .withBroadcastSet(bc1.union(bc2), BC_NAME) + .reduce(new Reducer()) + .collect(); + + Assert.assertEquals(result.get(0), Long.valueOf(3025)); + } + + public static class Mapper extends RichMapFunction<Long, Long> { + private List<Long> values; + + @Override + public void open(Configuration config) { + values = getRuntimeContext().getBroadcastVariable(BC_NAME); + } + + @Override + public Long map(Long value) throws Exception { + long sum = 0; + for (Long v : values) { + sum += value * v; + } + return sum; + } + } + + public static class Reducer implements ReduceFunction<Long> { + @Override + public Long reduce(Long value1, Long value2) throws Exception { + return value1 + value2; + } + } +}