[
https://issues.apache.org/jira/browse/FLINK-7?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14998583#comment-14998583
]
ASF GitHub Bot commented on FLINK-7:
------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1255#discussion_r44406470
--- Diff:
flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/RangePartitionRewriter.java
---
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.api.common.distributions.CommonRangeBoundaries;
+import org.apache.flink.api.common.operators.UnaryOperatorInformation;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.java.typeutils.RecordTypeInfo;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.udf.AssignRangeIndex;
+import org.apache.flink.runtime.operators.udf.PartitionIDRemoveWrapper;
+import org.apache.flink.runtime.operators.udf.RangeBoundaryBuilder;
+import org.apache.flink.api.java.functions.SampleInCoordinator;
+import org.apache.flink.api.java.functions.SampleInPartition;
+import org.apache.flink.api.java.sampling.IntermediateSampleData;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.optimizer.dag.GroupReduceNode;
+import org.apache.flink.optimizer.dag.MapNode;
+import org.apache.flink.optimizer.dag.MapPartitionNode;
+import org.apache.flink.optimizer.dag.TempMode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.NamedChannel;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+public class RangePartitionRewriter implements Visitor<PlanNode> {
+
+ final OptimizedPlan plan;
+
+ public RangePartitionRewriter(OptimizedPlan plan) {
+ this.plan = plan;
+ }
+
+ @Override
+ public boolean preVisit(PlanNode visitable) {
+ return true;
+ }
+
+ @Override
+ public void postVisit(PlanNode visitable) {
+ final List<Channel> outgoingChannels =
visitable.getOutgoingChannels();
+ final List<Channel> newOutGoingChannels = new LinkedList<>();
+ final List<Channel> toBeRemoveChannels = new ArrayList<>();
+ for (Channel channel : outgoingChannels) {
+ ShipStrategyType shipStrategy =
channel.getShipStrategy();
+ if (shipStrategy == ShipStrategyType.PARTITION_RANGE) {
+ TypeInformation<?> outputType =
channel.getSource().getProgramOperator().getOperatorInfo().getOutputType();
+ // Do not optimize for record type, it's a
special case for range partitioner, and should be removed later.
+ if (!(outputType instanceof RecordTypeInfo)) {
+
newOutGoingChannels.addAll(rewriteRangePartitionChannel(channel));
+ toBeRemoveChannels.add(channel);
+ }
+ }
+ }
+
+ for (Channel chan : toBeRemoveChannels) {
+ outgoingChannels.remove(chan);
+ }
+ outgoingChannels.addAll(newOutGoingChannels);
+ }
+
+ private List<Channel> rewriteRangePartitionChannel(Channel channel) {
+ final List<Channel> sourceNewOutputChannels = new ArrayList<>();
+ final PlanNode sourceNode = channel.getSource();
+ final PlanNode targetNode = channel.getTarget();
+ final int sourceParallelism = sourceNode.getParallelism();
+ final int targetParallelism = targetNode.getParallelism();
+ final TypeComparatorFactory<?> comparator =
Utils.getShipComparator(channel,
this.plan.getOriginalPlan().getExecutionConfig());
+ // 1. Fixed size sample in each partitions.
+ final long seed =
org.apache.flink.api.java.Utils.RNG.nextLong();
+ final int sampleSize = 20 * targetParallelism;
+ final SampleInPartition sampleInPartition = new
SampleInPartition(false, sampleSize, seed);
+ final TypeInformation<?> sourceOutputType =
sourceNode.getOptimizerNode().getOperator().getOperatorInfo().getOutputType();
+ final TypeInformation<IntermediateSampleData>
isdTypeInformation = TypeExtractor.getForClass(IntermediateSampleData.class);
+ final UnaryOperatorInformation sipOperatorInformation = new
UnaryOperatorInformation(sourceOutputType, isdTypeInformation);
+ final MapPartitionOperatorBase sipOperatorBase = new
MapPartitionOperatorBase(sampleInPartition, sipOperatorInformation, "Sample in
partitions");
+ final MapPartitionNode sipNode = new
MapPartitionNode(sipOperatorBase);
+ final Channel sipChannel = new Channel(sourceNode,
TempMode.NONE);
+ sipChannel.setShipStrategy(ShipStrategyType.FORWARD,
channel.getDataExchangeMode());
+ final SingleInputPlanNode sipPlanNode = new
SingleInputPlanNode(sipNode, "SampleInPartition PlanNode", sipChannel,
DriverStrategy.MAP_PARTITION);
+ sipPlanNode.setParallelism(sourceParallelism);
+ sipChannel.setTarget(sipPlanNode);
+ this.plan.getAllNodes().add(sipPlanNode);
+ sourceNewOutputChannels.add(sipChannel);
+
+ // 2. Fixed size sample in a single coordinator.
+ final SampleInCoordinator sampleInCoordinator = new
SampleInCoordinator(false, sampleSize, seed);
+ final UnaryOperatorInformation sicOperatorInformation = new
UnaryOperatorInformation(isdTypeInformation, sourceOutputType);
+ final GroupReduceOperatorBase sicOperatorBase = new
GroupReduceOperatorBase(sampleInCoordinator, sicOperatorInformation, "Sample in
coordinator");
+ final GroupReduceNode sicNode = new
GroupReduceNode(sicOperatorBase);
+ final Channel sicChannel = new Channel(sipPlanNode,
TempMode.NONE);
+ sicChannel.setShipStrategy(ShipStrategyType.PARTITION_HASH,
channel.getShipStrategyKeys(), channel.getShipStrategySortOrder(), null,
channel.getDataExchangeMode());
+ final SingleInputPlanNode sicPlanNode = new
SingleInputPlanNode(sicNode, "SampleInCoordinator PlanNode", sicChannel,
DriverStrategy.ALL_GROUP_REDUCE);
+ sicPlanNode.setParallelism(1);
+ sicChannel.setTarget(sicPlanNode);
+ sipPlanNode.addOutgoingChannel(sicChannel);
+ this.plan.getAllNodes().add(sicPlanNode);
+
+ // 3. Use sampled data to build range boundaries.
+ final RangeBoundaryBuilder rangeBoundaryBuilder = new
RangeBoundaryBuilder(comparator, targetParallelism);
+ final TypeInformation<CommonRangeBoundaries> rbTypeInformation
= TypeExtractor.getForClass(CommonRangeBoundaries.class);
+ final UnaryOperatorInformation rbOperatorInformation = new
UnaryOperatorInformation(sourceOutputType, rbTypeInformation);
+ final MapPartitionOperatorBase rbOperatorBase = new
MapPartitionOperatorBase(rangeBoundaryBuilder, rbOperatorInformation,
"RangeBoundaryBuilder");
+ final MapPartitionNode rbNode = new
MapPartitionNode(rbOperatorBase);
+ final Channel rbChannel = new Channel(sicPlanNode,
TempMode.NONE);
+ rbChannel.setShipStrategy(ShipStrategyType.FORWARD,
channel.getDataExchangeMode());
+ final SingleInputPlanNode rbPlanNode = new
SingleInputPlanNode(rbNode, "RangeBoundary PlanNode", rbChannel,
DriverStrategy.MAP_PARTITION);
+ rbPlanNode.setParallelism(1);
+ rbChannel.setTarget(rbPlanNode);
+ sicPlanNode.addOutgoingChannel(rbChannel);
+ this.plan.getAllNodes().add(rbPlanNode);
+
+ // 4. Take range boundaries as broadcast input and take the
tuple of partition id and record as output.
+ final AssignRangeIndex assignRangeIndex = new
AssignRangeIndex(comparator);
+ final TypeInformation<Tuple2> ariOutputTypeInformation = new
TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, sourceOutputType);
+ final UnaryOperatorInformation ariOperatorInformation = new
UnaryOperatorInformation(sourceOutputType, ariOutputTypeInformation);
+ final MapPartitionOperatorBase ariOperatorBase = new
MapPartitionOperatorBase(assignRangeIndex, ariOperatorInformation, "Assign
Range Index");
+ final MapPartitionNode ariNode = new
MapPartitionNode(ariOperatorBase);
+ final Channel ariChannel = new Channel(sourceNode,
TempMode.NONE);
+ // To avoid deadlock, set the DataExchangeMode of channel
between source node and this to Batch.
+ ariChannel.setShipStrategy(ShipStrategyType.FORWARD,
DataExchangeMode.BATCH);
+ final SingleInputPlanNode ariPlanNode = new
SingleInputPlanNode(ariNode, "AssignRangeIndex PlanNode", ariChannel,
DriverStrategy.MAP_PARTITION);
+ ariPlanNode.setParallelism(sourceParallelism);
+ ariChannel.setTarget(ariPlanNode);
+ this.plan.getAllNodes().add(ariPlanNode);
+ sourceNewOutputChannels.add(ariChannel);
+
+ final NamedChannel broadcastChannel = new
NamedChannel("RangeBoundaries", rbPlanNode);
+ broadcastChannel.setShipStrategy(ShipStrategyType.BROADCAST,
channel.getDataExchangeMode());
+ broadcastChannel.setTarget(ariPlanNode);
+ List<NamedChannel> broadcastChannels = new ArrayList<>(1);
+ broadcastChannels.add(broadcastChannel);
+ ariPlanNode.setBroadcastInputs(broadcastChannels);
+
+ // 5. Remove the partition id.
+ final Channel partChannel = new Channel(ariPlanNode,
channel.getTempMode());
+ partChannel.setDataExchangeMode(channel.getDataExchangeMode());
+ final FieldList keys = new FieldList(0);
+ final boolean[] sortDirection = { true };
+ partChannel.setShipStrategy(channel.getShipStrategy(), keys,
sortDirection, channel.getPartitioner(), channel.getDataExchangeMode());
+ ariPlanNode.addOutgoingChannel(channel);
+ partChannel.setLocalStrategy(channel.getLocalStrategy(), keys,
sortDirection);
--- End diff --
Do not set the local strategy. This should be done in the channel that
forwards to the target node.
> [GitHub] Enable Range Partitioner
> ---------------------------------
>
> Key: FLINK-7
> URL: https://issues.apache.org/jira/browse/FLINK-7
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Runtime
> Reporter: GitHub Import
> Assignee: Chengxiang Li
> Fix For: pre-apache
>
>
> The range partitioner is currently disabled. We need to implement the
> following aspects:
> 1) Distribution information, if available, must be propagated back together
> with the ordering property.
> 2) A generic bucket lookup structure (currently specific to PactRecord).
> Tests to re-enable after fixing this issue:
> - TeraSortITCase
> - GlobalSortingITCase
> - GlobalSortingMixedOrderITCase
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/7
> Created by: [StephanEwen|https://github.com/StephanEwen]
> Labels: core, enhancement, optimizer,
> Milestone: Release 0.4
> Assignee: [fhueske|https://github.com/fhueske]
> Created at: Fri Apr 26 13:48:24 CEST 2013
> State: open
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)