http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java new file mode 100644 index 0000000..3fabad6 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.runtime.operators.DriverStrategy; + + +public class CrossStreamOuterSecondDescriptor extends CartesianProductDescriptor { + + public CrossStreamOuterSecondDescriptor() { + this(true, true); + } + + public CrossStreamOuterSecondDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) { + super(allowBroadcastFirst, allowBroadcastSecond); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + // uniqueness becomes grouping with streamed nested loops + if ((in2.getGroupedFields() == null || in2.getGroupedFields().size() == 0) && + in2.getUniqueFields() != null && in2.getUniqueFields().size() > 0) + { + return LocalProperties.forGrouping(in2.getUniqueFields().iterator().next().toFieldList()); + } else { + return in2.clearUniqueFieldSets(); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java new file mode 100644 index 0000000..81c823f --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + + +public class FilterDescriptor extends OperatorDescriptorSingle { + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.FLAT_MAP; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "Filter ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP); + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + RequestedGlobalProperties rgp = new RequestedGlobalProperties(); + rgp.setAnyDistribution(); + return Collections.singletonList(rgp); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + return Collections.singletonList(new RequestedLocalProperties()); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java new file mode 100644 index 0000000..b915e45 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.PartitioningProperty; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + + +public class FlatMapDescriptor extends OperatorDescriptorSingle { + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.FLAT_MAP; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "FlatMap ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP); + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + RequestedGlobalProperties rgp = new RequestedGlobalProperties(); + rgp.setAnyDistribution(); + return Collections.singletonList(rgp); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + return Collections.singletonList(new RequestedLocalProperties()); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) + { + gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); + } + gProps.clearUniqueFieldCombinations(); + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps.clearUniqueFieldSets(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java new file mode 100644 index 0000000..b648386 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.PartitioningProperty; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +/** + * The properties file belonging to the GroupCombineNode. It translates the GroupCombine operation + * to the driver strategy SORTED_GROUP_COMBINE and sets the relevant grouping and sorting keys. + * @see org.apache.flink.optimizer.dag.GroupCombineNode + */ +public final class GroupCombineProperties extends OperatorDescriptorSingle { + + private final Ordering ordering; // ordering that we need to use if an additional ordering is requested + + public GroupCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys) { + super(groupKeys); + + // if we have an additional ordering, construct the ordering to have primarily the grouping fields + + this.ordering = new Ordering(); + for (Integer key : this.keyList) { + this.ordering.appendOrdering(key, null, Order.ANY); + } + + // and next the additional order fields + if (additionalOrderKeys != null) { + for (int i = 0; i < additionalOrderKeys.getNumberOfFields(); i++) { + Integer field = additionalOrderKeys.getFieldNumber(i); + Order order = additionalOrderKeys.getOrder(i); + this.ordering.appendOrdering(field, additionalOrderKeys.getType(i), order); + } + } + + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.SORTED_GROUP_COMBINE; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + node.setDegreeOfParallelism(in.getSource().getParallelism()); + + // sorting key info + SingleInputPlanNode singleInputPlanNode = new SingleInputPlanNode( + node, + "GroupCombine (" + node.getOperator().getName() + ")", + in, // reuse the combine strategy also used in the group reduce + DriverStrategy.SORTED_GROUP_COMBINE, this.keyList); + + // set sorting comparator key info + singleInputPlanNode.setDriverKeyInfo(this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections(), 0); + // set grouping comparator key info + singleInputPlanNode.setDriverKeyInfo(this.keyList, 1); + + return singleInputPlanNode; + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + RequestedGlobalProperties props = new RequestedGlobalProperties(); + props.setRandomPartitioning(); + return Collections.singletonList(props); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + return Collections.singletonList(new RequestedLocalProperties()); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) { + gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); + } + gProps.clearUniqueFieldCombinations(); + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps.clearUniqueFieldSets(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java new file mode 100644 index 0000000..ebd09f2 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.PartitioningProperty; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +public final class GroupReduceProperties extends OperatorDescriptorSingle { + + private final Ordering ordering; // ordering that we need to use if an additional ordering is requested + + private final Partitioner<?> customPartitioner; + + + public GroupReduceProperties(FieldSet keys) { + this(keys, null, null); + } + + public GroupReduceProperties(FieldSet keys, Ordering additionalOrderKeys) { + this(keys, additionalOrderKeys, null); + } + + public GroupReduceProperties(FieldSet keys, Partitioner<?> customPartitioner) { + this(keys, null, customPartitioner); + } + + public GroupReduceProperties(FieldSet groupKeys, Ordering additionalOrderKeys, Partitioner<?> customPartitioner) { + super(groupKeys); + + // if we have an additional ordering, construct the ordering to have primarily the grouping fields + if (additionalOrderKeys != null) { + this.ordering = new Ordering(); + for (Integer key : this.keyList) { + this.ordering.appendOrdering(key, null, Order.ANY); + } + + // and next the additional order fields + for (int i = 0; i < additionalOrderKeys.getNumberOfFields(); i++) { + Integer field = additionalOrderKeys.getFieldNumber(i); + Order order = additionalOrderKeys.getOrder(i); + this.ordering.appendOrdering(field, additionalOrderKeys.getType(i), order); + } + } + else { + this.ordering = null; + } + + this.customPartitioner = customPartitioner; + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.SORTED_GROUP_REDUCE; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + RequestedGlobalProperties props = new RequestedGlobalProperties(); + + if (customPartitioner == null) { + props.setAnyPartitioning(this.keys); + } else { + props.setCustomPartitioned(this.keys, this.customPartitioner); + } + return Collections.singletonList(props); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + RequestedLocalProperties props = new RequestedLocalProperties(); + if (this.ordering == null) { + props.setGroupedFields(this.keys); + } else { + props.setOrdering(this.ordering); + } + return Collections.singletonList(props); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) + { + gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); + } + gProps.clearUniqueFieldCombinations(); + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps.clearUniqueFieldSets(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java new file mode 100644 index 0000000..c4f47d3 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dag.GroupReduceNode; +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.PartitioningProperty; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.io.network.DataExchangeMode; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.runtime.operators.util.LocalStrategy; + +public final class GroupReduceWithCombineProperties extends OperatorDescriptorSingle { + + private final Ordering ordering; // ordering that we need to use if an additional ordering is requested + + private final Partitioner<?> customPartitioner; + + + public GroupReduceWithCombineProperties(FieldSet groupKeys) { + this(groupKeys, null, null); + } + + public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys) { + this(groupKeys, additionalOrderKeys, null); + } + + public GroupReduceWithCombineProperties(FieldSet groupKeys, Partitioner<?> customPartitioner) { + this(groupKeys, null, customPartitioner); + } + + public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys, Partitioner<?> customPartitioner) { + super(groupKeys); + + // if we have an additional ordering, construct the ordering to have primarily the grouping fields + if (additionalOrderKeys != null) { + this.ordering = new Ordering(); + for (Integer key : this.keyList) { + this.ordering.appendOrdering(key, null, Order.ANY); + } + + // and next the additional order fields + for (int i = 0; i < additionalOrderKeys.getNumberOfFields(); i++) { + Integer field = additionalOrderKeys.getFieldNumber(i); + Order order = additionalOrderKeys.getOrder(i); + this.ordering.appendOrdering(field, additionalOrderKeys.getType(i), order); + } + } else { + this.ordering = null; + } + + this.customPartitioner = customPartitioner; + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.SORTED_GROUP_REDUCE; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + if (in.getShipStrategy() == ShipStrategyType.FORWARD) { + // adjust a sort (changes grouping, so it must be for this driver to combining sort + if (in.getLocalStrategy() == LocalStrategy.SORT) { + if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) { + throw new RuntimeException("Bug: Inconsistent sort for group strategy."); + } + in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), + in.getLocalStrategySortOrder()); + } + return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", in, + DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); + } else { + // non forward case. all local properties are killed anyways, so we can safely plug in a combiner + Channel toCombiner = new Channel(in.getSource()); + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + + // create an input node for combine with same DOP as input node + GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); + combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); + + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator() + .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE); + combiner.setCosts(new Costs(0, 0)); + combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); + // set sorting comparator key info + combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); + // set grouping comparator key info + combiner.setDriverKeyInfo(this.keyList, 1); + + Channel toReducer = new Channel(combiner); + toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), + in.getShipStrategySortOrder(), in.getDataExchangeMode()); + toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), + in.getLocalStrategySortOrder()); + + return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", + toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); + } + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + RequestedGlobalProperties props = new RequestedGlobalProperties(); + if (customPartitioner == null) { + props.setAnyPartitioning(this.keys); + } else { + props.setCustomPartitioned(this.keys, this.customPartitioner); + } + return Collections.singletonList(props); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + RequestedLocalProperties props = new RequestedLocalProperties(); + if (this.ordering == null) { + props.setGroupedFields(this.keys); + } else { + props.setOrdering(this.ordering); + } + return Collections.singletonList(props); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) + { + gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); + } + gProps.clearUniqueFieldCombinations(); + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps.clearUniqueFieldSets(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java new file mode 100644 index 0000000..fec72a9 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +/** + * + */ +public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor { + + public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2) { + super(keys1, keys2); + } + + public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2, + boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) + { + super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.HYBRIDHASH_BUILD_FIRST; + } + + @Override + protected List<LocalPropertiesPair> createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) + { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + DriverStrategy strategy; + + if(!in1.isOnDynamicPath() && in2.isOnDynamicPath()) { + // sanity check that the first input is cached and remove that cache + if (!in1.getTempMode().isCached()) { + throw new CompilerException("No cache at point where static and dynamic parts meet."); + } + in1.setTempMode(in1.getTempMode().makeNonCached()); + strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED; + } + else { + strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST; + } + return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + return new LocalProperties(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java new file mode 100644 index 0000000..f9d1e6c --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor { + + public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2) { + super(keys1, keys2); + } + + public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2, + boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) + { + super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.HYBRIDHASH_BUILD_SECOND; + } + + @Override + protected List<LocalPropertiesPair> createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new LocalPropertiesPair( + new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) + { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + DriverStrategy strategy; + + if (!in2.isOnDynamicPath() && in1.isOnDynamicPath()) { + // sanity check that the first input is cached and remove that cache + if (!in2.getTempMode().isCached()) { + throw new CompilerException("No cache at point where static and dynamic parts meet."); + } + + in2.setTempMode(in2.getTempMode().makeNonCached()); + strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED; + } + else { + strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND; + } + return new DualInputPlanNode(node, "Join ("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + return new LocalProperties(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java new file mode 100644 index 0000000..9f14d2a --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + + +public class MapDescriptor extends OperatorDescriptorSingle { + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.MAP; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.MAP); + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + RequestedGlobalProperties rgp = new RequestedGlobalProperties(); + rgp.setAnyDistribution(); + return Collections.singletonList(rgp); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + return Collections.singletonList(new RequestedLocalProperties()); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java new file mode 100644 index 0000000..1489097 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + + +public class MapPartitionDescriptor extends OperatorDescriptorSingle { + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.MAP_PARTITION; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "MapPartition ("+node.getOperator().getName()+")", in, DriverStrategy.MAP_PARTITION); + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + RequestedGlobalProperties rgp = new RequestedGlobalProperties(); + rgp.setAnyDistribution(); + return Collections.singletonList(rgp); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + return Collections.singletonList(new RequestedLocalProperties()); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java new file mode 100644 index 0000000..7ae35c3 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + + +public class NoOpDescriptor extends OperatorDescriptorSingle { + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.UNARY_NO_OP; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "Pipe", in, DriverStrategy.UNARY_NO_OP); + } + + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + return Collections.singletonList(new RequestedGlobalProperties()); + } + + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + return Collections.singletonList(new RequestedLocalProperties()); + } + + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + return gProps; + } + + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java new file mode 100644 index 0000000..c21593e --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.operators; + +import java.util.List; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; + +/** + * + */ +public abstract class OperatorDescriptorDual implements AbstractOperatorDescriptor { + + protected final FieldList keys1; + protected final FieldList keys2; + + private List<GlobalPropertiesPair> globalProps; + private List<LocalPropertiesPair> localProps; + + protected OperatorDescriptorDual() { + this(null, null); + } + + protected OperatorDescriptorDual(FieldList keys1, FieldList keys2) { + this.keys1 = keys1; + this.keys2 = keys2; + } + + public List<GlobalPropertiesPair> getPossibleGlobalProperties() { + if (this.globalProps == null) { + this.globalProps = createPossibleGlobalProperties(); + } + + return this.globalProps; + } + + public List<LocalPropertiesPair> getPossibleLocalProperties() { + if (this.localProps == null) { + this.localProps = createPossibleLocalProperties(); + } + + return this.localProps; + } + + protected abstract List<GlobalPropertiesPair> createPossibleGlobalProperties(); + + protected abstract List<LocalPropertiesPair> createPossibleLocalProperties(); + + public abstract boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, + GlobalProperties produced1, GlobalProperties produced2); + + public abstract boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2); + + public abstract DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node); + + public abstract GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2); + + public abstract LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2); + + protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2) { + + // check number of produced partitioning fields + if(fields1.size() != fields2.size()) { + return false; + } else { + return checkEquivalentFieldPositionsInKeyFields(fields1, fields2, fields1.size()); + } + } + + protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2, int numRelevantFields) { + + // check number of produced partitioning fields + if(fields1.size() < numRelevantFields || fields2.size() < numRelevantFields) { + return false; + } + else { + for(int i=0; i<numRelevantFields; i++) { + int pField1 = fields1.get(i); + int pField2 = fields2.get(i); + // check if position of both produced fields is the same in both requested fields + int j; + for(j=0; j<this.keys1.size(); j++) { + if(this.keys1.get(j) == pField1 && this.keys2.get(j) == pField2) { + break; + } + else if(this.keys1.get(j) != pField1 && this.keys2.get(j) != pField2) { + // do nothing + } + else { + return false; + } + } + if(j == this.keys1.size()) { + throw new CompilerException("Fields were not found in key fields."); + } + } + } + return true; + } + + // -------------------------------------------------------------------------------------------- + + public static final class GlobalPropertiesPair { + + private final RequestedGlobalProperties props1, props2; + + public GlobalPropertiesPair(RequestedGlobalProperties props1, RequestedGlobalProperties props2) { + this.props1 = props1; + this.props2 = props2; + } + + public RequestedGlobalProperties getProperties1() { + return this.props1; + } + + public RequestedGlobalProperties getProperties2() { + return this.props2; + } + + @Override + public int hashCode() { + return (this.props1 == null ? 0 : this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode()); + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == GlobalPropertiesPair.class) { + final GlobalPropertiesPair other = (GlobalPropertiesPair) obj; + + return (this.props1 == null ? other.props1 == null : this.props1.equals(other.props1)) && + (this.props2 == null ? other.props2 == null : this.props2.equals(other.props2)); + } + return false; + } + + @Override + public String toString() { + return "{" + this.props1 + " / " + this.props2 + "}"; + } + } + + public static final class LocalPropertiesPair { + + private final RequestedLocalProperties props1, props2; + + public LocalPropertiesPair(RequestedLocalProperties props1, RequestedLocalProperties props2) { + this.props1 = props1; + this.props2 = props2; + } + + public RequestedLocalProperties getProperties1() { + return this.props1; + } + + public RequestedLocalProperties getProperties2() { + return this.props2; + } + + @Override + public int hashCode() { + return (this.props1 == null ? 0 : this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode()); + } + + @Override + public boolean equals(Object obj) { + if (obj.getClass() == LocalPropertiesPair.class) { + final LocalPropertiesPair other = (LocalPropertiesPair) obj; + + return (this.props1 == null ? other.props1 == null : this.props1.equals(other.props1)) && + (this.props2 == null ? other.props2 == null : this.props2.equals(other.props2)); + } + return false; + } + + @Override + public String toString() { + return "{" + this.props1 + " / " + this.props2 + "}"; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java new file mode 100644 index 0000000..c8be5d4 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.operators; + +import java.util.List; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; + +/** + * Abstract base class for Operator descriptions which instantiates the node and sets the driver + * strategy and the sorting and grouping keys. Returns possible local and global properties and + * updates them after the operation has been performed. + * @see org.apache.flink.compiler.dag.SingleInputNode + */ +public abstract class OperatorDescriptorSingle implements AbstractOperatorDescriptor { + + protected final FieldSet keys; // the set of key fields + protected final FieldList keyList; // the key fields with ordered field positions + + private List<RequestedGlobalProperties> globalProps; + private List<RequestedLocalProperties> localProps; + + + protected OperatorDescriptorSingle() { + this(null); + } + + protected OperatorDescriptorSingle(FieldSet keys) { + this.keys = keys; + this.keyList = keys == null ? null : keys.toFieldList(); + } + + + public List<RequestedGlobalProperties> getPossibleGlobalProperties() { + if (this.globalProps == null) { + this.globalProps = createPossibleGlobalProperties(); + } + return this.globalProps; + } + + public List<RequestedLocalProperties> getPossibleLocalProperties() { + if (this.localProps == null) { + this.localProps = createPossibleLocalProperties(); + } + return this.localProps; + } + + /** + * Returns a list of global properties that are required by this operator descriptor. + * + * @return A list of global properties that are required by this operator descriptor. + */ + protected abstract List<RequestedGlobalProperties> createPossibleGlobalProperties(); + + /** + * Returns a list of local properties that are required by this operator descriptor. + * + * @return A list of local properties that are required by this operator descriptor. + */ + protected abstract List<RequestedLocalProperties> createPossibleLocalProperties(); + + public abstract SingleInputPlanNode instantiate(Channel in, SingleInputNode node); + + /** + * Returns the global properties which are present after the operator was applied on the + * provided global properties. + * + * @param in The global properties on which the operator is applied. + * @return The global properties which are valid after the operator has been applied. + */ + public abstract GlobalProperties computeGlobalProperties(GlobalProperties in); + + /** + * Returns the local properties which are present after the operator was applied on the + * provided local properties. + * + * @param in The local properties on which the operator is applied. + * @return The local properties which are valid after the operator has been applied. + */ + public abstract LocalProperties computeLocalProperties(LocalProperties in); +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java new file mode 100644 index 0000000..2bde29b --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.dag.GroupReduceNode; +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.PartitioningProperty; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +public final class PartialGroupProperties extends OperatorDescriptorSingle { + + public PartialGroupProperties(FieldSet keys) { + super(keys); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.SORTED_GROUP_COMBINE; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + // create in input node for combine with same DOP as input node + GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator()); + combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); + + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in, + DriverStrategy.SORTED_GROUP_COMBINE); + // sorting key info + combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0); + // set grouping comparator key info + combiner.setDriverKeyInfo(this.keyList, 1); + + return combiner; + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + return Collections.singletonList(new RequestedGlobalProperties()); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + RequestedLocalProperties props = new RequestedLocalProperties(); + props.setGroupedFields(this.keys); + return Collections.singletonList(props); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) + { + gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); + } + gProps.clearUniqueFieldCombinations(); + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps.clearUniqueFieldSets(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java new file mode 100644 index 0000000..5bb51f3 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dag.ReduceNode; +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.PartitioningProperty; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.io.network.DataExchangeMode; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.shipping.ShipStrategyType; +import org.apache.flink.runtime.operators.util.LocalStrategy; + +public final class ReduceProperties extends OperatorDescriptorSingle { + + private final Partitioner<?> customPartitioner; + + public ReduceProperties(FieldSet keys) { + this(keys, null); + } + + public ReduceProperties(FieldSet keys, Partitioner<?> customPartitioner) { + super(keys); + this.customPartitioner = customPartitioner; + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.SORTED_REDUCE; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + if (in.getShipStrategy() == ShipStrategyType.FORWARD || + (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) + { + return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in, + DriverStrategy.SORTED_REDUCE, this.keyList); + } + else { + // non forward case. all local properties are killed anyways, so we can safely plug in a combiner + Channel toCombiner = new Channel(in.getSource()); + toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); + + // create an input node for combine with same DOP as input node + ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); + combinerNode.setDegreeOfParallelism(in.getSource().getParallelism()); + + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, + "Combine ("+node.getOperator().getName()+")", toCombiner, + DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList); + + combiner.setCosts(new Costs(0, 0)); + combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); + + Channel toReducer = new Channel(combiner); + toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), + in.getShipStrategySortOrder(), in.getDataExchangeMode()); + toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); + + return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer, + DriverStrategy.SORTED_REDUCE, this.keyList); + } + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + RequestedGlobalProperties props = new RequestedGlobalProperties(); + if (customPartitioner == null) { + props.setAnyPartitioning(this.keys); + } else { + props.setCustomPartitioned(this.keys, this.customPartitioner); + } + return Collections.singletonList(props); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + RequestedLocalProperties props = new RequestedLocalProperties(); + props.setGroupedFields(this.keys); + return Collections.singletonList(props); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 && + gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) + { + gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList()); + } + gProps.clearUniqueFieldCombinations(); + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps.clearUniqueFieldSets(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java new file mode 100644 index 0000000..1dcd87d --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.dag.SingleInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.SingleInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +/** + * + */ +public class SolutionSetDeltaOperator extends OperatorDescriptorSingle { + + public SolutionSetDeltaOperator(FieldList partitioningFields) { + super(partitioningFields); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.UNARY_NO_OP; + } + + @Override + public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + return new SingleInputPlanNode(node, "SolutionSet Delta", in, DriverStrategy.UNARY_NO_OP); + } + + @Override + protected List<RequestedGlobalProperties> createPossibleGlobalProperties() { + RequestedGlobalProperties partProps = new RequestedGlobalProperties(); + partProps.setHashPartitioned(this.keyList); + return Collections.singletonList(partProps); + } + + @Override + protected List<RequestedLocalProperties> createPossibleLocalProperties() { + return Collections.singletonList(new RequestedLocalProperties()); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties gProps) { + return gProps; + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties lProps) { + return lProps; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java new file mode 100644 index 0000000..356836a --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.util.Utils; +import org.apache.flink.runtime.operators.DriverStrategy; + +/** + * + */ +public class SortMergeJoinDescriptor extends AbstractJoinDescriptor { + + public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2) { + super(keys1, keys2); + } + + public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2, + boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed) + { + super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.MERGE; + } + + @Override + protected List<LocalPropertiesPair> createPossibleLocalProperties() { + RequestedLocalProperties sort1 = new RequestedLocalProperties(Utils.createOrdering(this.keys1)); + RequestedLocalProperties sort2 = new RequestedLocalProperties(Utils.createOrdering(this.keys2)); + return Collections.singletonList(new LocalPropertiesPair(sort1, sort2)); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) + { + int numRelevantFields = this.keys1.size(); + + Ordering prod1 = produced1.getOrdering(); + Ordering prod2 = produced2.getOrdering(); + + if (prod1 == null || prod2 == null) { + throw new CompilerException("The given properties do not meet this operators requirements."); + } + + // check that order of fields is equivalent + if (!checkEquivalentFieldPositionsInKeyFields( + prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) { + return false; + } + + // check that both inputs have the same directions of order + for (int i = 0; i < numRelevantFields; i++) { + if (prod1.getOrder(i) != prod2.getOrder(i)) { + return false; + } + } + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections(); + + if (inputOrders == null || inputOrders.length < this.keys1.size()) { + throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a merge operator."); + } else if (inputOrders.length > this.keys1.size()) { + boolean[] tmp = new boolean[this.keys1.size()]; + System.arraycopy(inputOrders, 0, tmp, 0, tmp.length); + inputOrders = tmp; + } + + return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, this.keys1, this.keys2, inputOrders); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + LocalProperties comb = LocalProperties.combine(in1, in2); + return comb.clearUniqueFieldSets(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java new file mode 100644 index 0000000..c42cff2 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.operators; + +import java.util.Collections; +import java.util.List; + +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.dag.SinkJoiner; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.GlobalProperties; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.optimizer.plan.SinkJoinerPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +/** + * + */ +public class UtilSinkJoinOpDescriptor extends OperatorDescriptorDual { + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.BINARY_NO_OP; + } + + @Override + protected List<GlobalPropertiesPair> createPossibleGlobalProperties() { + // all properties are possible + return Collections.singletonList(new GlobalPropertiesPair( + new RequestedGlobalProperties(), new RequestedGlobalProperties())); + } + + @Override + protected List<LocalPropertiesPair> createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new LocalPropertiesPair( + new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2, + GlobalProperties produced1, GlobalProperties produced2) { + return true; + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + if (node instanceof SinkJoiner) { + return new SinkJoinerPlanNode((SinkJoiner) node, in1, in2); + } else { + throw new CompilerException(); + } + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + return new LocalProperties(); + } + + @Override + public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) { + return GlobalProperties.combine(in1, in2); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java new file mode 100644 index 0000000..bf22fb3 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.optimizer.plan; + +import org.apache.flink.optimizer.dag.BinaryUnionNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +/** + * A special subclass for the union to make it identifiable. + */ +public class BinaryUnionPlanNode extends DualInputPlanNode { + + /** + * @param template + */ + public BinaryUnionPlanNode(BinaryUnionNode template, Channel in1, Channel in2) { + super(template, "Union", in1, in2, DriverStrategy.UNION); + } + + public BinaryUnionPlanNode(BinaryUnionPlanNode toSwapFrom) { + super(toSwapFrom.getOptimizerNode(), "Union-With-Cached", toSwapFrom.getInput2(), toSwapFrom.getInput1(), + DriverStrategy.UNION_WITH_CACHED); + + this.globalProps = toSwapFrom.globalProps; + this.localProps = toSwapFrom.localProps; + this.nodeCosts = toSwapFrom.nodeCosts; + this.cumulativeCosts = toSwapFrom.cumulativeCosts; + + setParallelism(toSwapFrom.getParallelism()); + } + + public BinaryUnionNode getOptimizerNode() { + return (BinaryUnionNode) this.template; + } + + public boolean unionsStaticAndDynamicPath() { + return getInput1().isOnDynamicPath() != getInput2().isOnDynamicPath(); + } + + @Override + public int getMemoryConsumerWeight() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java new file mode 100644 index 0000000..e79e2f3 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.optimizer.plan; + +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE; +import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM; + +import java.util.HashMap; + +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.costs.Costs; +import org.apache.flink.optimizer.dag.BulkIterationNode; +import org.apache.flink.optimizer.dag.OptimizerNode; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.util.Visitor; + +public class BulkIterationPlanNode extends SingleInputPlanNode implements IterationPlanNode { + + private final BulkPartialSolutionPlanNode partialSolutionPlanNode; + + private final PlanNode rootOfStepFunction; + + private PlanNode rootOfTerminationCriterion; + + private TypeSerializerFactory<?> serializerForIterationChannel; + + // -------------------------------------------------------------------------------------------- + + public BulkIterationPlanNode(BulkIterationNode template, String nodeName, Channel input, + BulkPartialSolutionPlanNode pspn, PlanNode rootOfStepFunction) + { + super(template, nodeName, input, DriverStrategy.NONE); + this.partialSolutionPlanNode = pspn; + this.rootOfStepFunction = rootOfStepFunction; + + mergeBranchPlanMaps(); + } + + public BulkIterationPlanNode(BulkIterationNode template, String nodeName, Channel input, + BulkPartialSolutionPlanNode pspn, PlanNode rootOfStepFunction, PlanNode rootOfTerminationCriterion) + { + this(template, nodeName, input, pspn, rootOfStepFunction); + this.rootOfTerminationCriterion = rootOfTerminationCriterion; + } + + // -------------------------------------------------------------------------------------------- + + public BulkIterationNode getIterationNode() { + if (this.template instanceof BulkIterationNode) { + return (BulkIterationNode) this.template; + } else { + throw new RuntimeException(); + } + } + + public BulkPartialSolutionPlanNode getPartialSolutionPlanNode() { + return this.partialSolutionPlanNode; + } + + public PlanNode getRootOfStepFunction() { + return this.rootOfStepFunction; + } + + public PlanNode getRootOfTerminationCriterion() { + return this.rootOfTerminationCriterion; + } + + // -------------------------------------------------------------------------------------------- + + + public TypeSerializerFactory<?> getSerializerForIterationChannel() { + return serializerForIterationChannel; + } + + public void setSerializerForIterationChannel(TypeSerializerFactory<?> serializerForIterationChannel) { + this.serializerForIterationChannel = serializerForIterationChannel; + } + + public void setCosts(Costs nodeCosts) { + // add the costs from the step function + nodeCosts.addCosts(this.rootOfStepFunction.getCumulativeCosts()); + + // add the costs for the termination criterion, if it exists + // the costs are divided at branches, so we can simply add them up + if (rootOfTerminationCriterion != null) { + nodeCosts.addCosts(this.rootOfTerminationCriterion.getCumulativeCosts()); + } + + super.setCosts(nodeCosts); + } + + public int getMemoryConsumerWeight() { + return 1; + } + + + @Override + public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) { + if (source == this) { + return FOUND_SOURCE; + } + + SourceAndDamReport fromOutside = super.hasDamOnPathDownTo(source); + + if (fromOutside == FOUND_SOURCE_AND_DAM) { + return FOUND_SOURCE_AND_DAM; + } + else if (fromOutside == FOUND_SOURCE) { + // we always have a dam in the back channel + return FOUND_SOURCE_AND_DAM; + } else { + // check the step function for dams + return this.rootOfStepFunction.hasDamOnPathDownTo(source); + } + } + + @Override + public void acceptForStepFunction(Visitor<PlanNode> visitor) { + this.rootOfStepFunction.accept(visitor); + + if(this.rootOfTerminationCriterion != null) { + this.rootOfTerminationCriterion.accept(visitor); + } + } + + private void mergeBranchPlanMaps() { + for (OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()) { + OptimizerNode brancher = desc.getBranchingNode(); + + if (branchPlan == null) { + branchPlan = new HashMap<OptimizerNode, PlanNode>(6); + } + + if (!branchPlan.containsKey(brancher)) { + PlanNode selectedCandidate = null; + + if (rootOfStepFunction.branchPlan != null) { + selectedCandidate = rootOfStepFunction.branchPlan.get(brancher); + } + + if (selectedCandidate == null) { + throw new CompilerException( + "Candidates for a node with open branches are missing information about the selected candidate "); + } + + this.branchPlan.put(brancher, selectedCandidate); + } + } + } +}