Repository: flink Updated Branches: refs/heads/master c56e3f10b -> bed3da4a6
[FLINK-1444][api-extending] Add support for attaching data properties to data sources This closes #379 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0a28bf5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0a28bf5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0a28bf5 Branch: refs/heads/master Commit: f0a28bf5345084a0a43df16021e60078e322e087 Parents: c56e3f1 Author: Fabian Hueske <[email protected]> Authored: Fri Feb 6 14:28:00 2015 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Feb 20 16:10:01 2015 +0100 ---------------------------------------------------------------------- .../flink/compiler/dag/DataSourceNode.java | 82 +- .../flink/compiler/plan/SourcePlanNode.java | 12 +- .../flink/compiler/PropertyDataSourceTest.java | 897 +++++++++++++++++++ .../common/operators/GenericDataSourceBase.java | 41 +- .../flink/api/java/io/SplitDataProperties.java | 464 ++++++++++ .../flink/api/java/operators/DataSource.java | 30 +- 6 files changed, 1517 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java index af2a92b..49946e0 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java @@ -24,19 +24,25 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.NonParallelInput; import org.apache.flink.api.common.io.ReplicatingInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.api.common.operators.GenericDataSourceBase; +import org.apache.flink.api.common.operators.GenericDataSourceBase.SplitDataProperties; import org.apache.flink.api.common.operators.Operator; +import org.apache.flink.api.common.operators.Ordering; import org.apache.flink.api.common.operators.SemanticProperties; import org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties; +import org.apache.flink.api.common.operators.util.FieldList; import org.apache.flink.compiler.DataStatistics; import org.apache.flink.compiler.PactCompiler; import org.apache.flink.compiler.costs.CostEstimator; import org.apache.flink.compiler.costs.Costs; +import org.apache.flink.compiler.dataproperties.GlobalProperties; +import org.apache.flink.compiler.dataproperties.LocalProperties; import org.apache.flink.compiler.plan.PlanNode; import org.apache.flink.compiler.plan.SourcePlanNode; import org.apache.flink.configuration.Configuration; @@ -51,6 +57,10 @@ public class DataSourceNode extends OptimizerNode { private final boolean replicatedInput; + private GlobalProperties gprops; + + private LocalProperties lprops; + /** * Creates a new DataSourceNode for the given contract. * @@ -76,6 +86,20 @@ public class DataSourceNode extends OptimizerNode { } else { this.replicatedInput = false; } + + this.gprops = new GlobalProperties(); + this.lprops = new LocalProperties(); + + SplitDataProperties<?> splitProps = pactContract.getSplitDataProperties(); + + if(replicatedInput) { + this.gprops.setFullyReplicated(); + this.lprops = new LocalProperties(); + } else if (splitProps != null) { + // configure data properties of data source using split properties + setDataPropertiesFromSplitProperties(splitProps); + } + } /** @@ -184,7 +208,8 @@ public class DataSourceNode extends OptimizerNode { return this.cachedPlans; } - SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getPactContract().getName()+")"); + SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getPactContract().getName()+")", + this.gprops, this.lprops); if(!replicatedInput) { candidate.updatePropertiesWithUniqueSets(getUniqueFields()); @@ -205,8 +230,6 @@ public class DataSourceNode extends OptimizerNode { estimator.addFileInputCost(this.estimatedOutputSize * this.getDegreeOfParallelism(), costs); } candidate.setCosts(costs); - - candidate.getGlobalProperties().setFullyReplicated(); } // since there is only a single plan for the data-source, return a list with that element only @@ -228,4 +251,57 @@ public class DataSourceNode extends OptimizerNode { visitor.postVisit(this); } } + + private void setDataPropertiesFromSplitProperties(SplitDataProperties splitProps) { + + // set global properties + int[] partitionKeys = splitProps.getSplitPartitionKeys(); + Partitioner<?> partitioner = splitProps.getSplitPartitioner(); + + if(partitionKeys != null && partitioner != null) { + this.gprops.setCustomPartitioned(new FieldList(partitionKeys), partitioner); + } + else if(partitionKeys != null) { + this.gprops.setAnyPartitioning(new FieldList(partitionKeys)); + } + // set local properties + int[] groupingKeys = splitProps.getSplitGroupKeys(); + Ordering ordering = splitProps.getSplitOrder(); + + // more than one split per source tasks possible. + // adapt split grouping and sorting + if(ordering != null) { + + // sorting falls back to grouping because a source can read multiple, + // randomly assigned splits + groupingKeys = ordering.getFieldPositions(); + } + + if(groupingKeys != null && partitionKeys != null) { + // check if grouping is also valid across splits, i.e., whether grouping keys are + // valid superset of partition keys + boolean allFieldsIncluded = true; + for(int i : partitionKeys) { + boolean fieldIncluded = false; + for(int j : groupingKeys) { + if(i == j) { + fieldIncluded = true; + break; + } + } + if(!fieldIncluded) { + allFieldsIncluded = false; + break; + } + } + if (allFieldsIncluded) { + this.lprops = LocalProperties.forGrouping(new FieldList(groupingKeys)); + } else { + this.lprops = new LocalProperties(); + } + + } else { + this.lprops = new LocalProperties(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java index 891345d..813feb3 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SourcePlanNode.java @@ -44,13 +44,17 @@ public class SourcePlanNode extends PlanNode { * @param template The template optimizer node that this candidate is created for. */ public SourcePlanNode(DataSourceNode template, String nodeName) { + this(template, nodeName, new GlobalProperties(), new LocalProperties()); + } + + public SourcePlanNode(DataSourceNode template, String nodeName, GlobalProperties gprops, LocalProperties lprops) { super(template, nodeName, DriverStrategy.NONE); - - this.globalProps = new GlobalProperties(); - this.localProps = new LocalProperties(); + + this.globalProps = gprops; + this.localProps = lprops; updatePropertiesWithUniqueSets(template.getUniqueFields()); } - + // -------------------------------------------------------------------------------------------- public DataSourceNode getDataSourceNode() { http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java new file mode 100644 index 0000000..7b023e5 --- /dev/null +++ b/flink-compiler/src/test/java/org/apache/flink/compiler/PropertyDataSourceTest.java @@ -0,0 +1,897 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler; + +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.util.FieldSet; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.operators.translation.JavaPlan; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.compiler.dataproperties.GlobalProperties; +import org.apache.flink.compiler.dataproperties.LocalProperties; +import org.apache.flink.compiler.dataproperties.PartitioningProperty; +import org.apache.flink.compiler.plan.NAryUnionPlanNode; +import org.apache.flink.compiler.plan.OptimizedPlan; +import org.apache.flink.compiler.plan.SinkPlanNode; +import org.apache.flink.compiler.plan.SourcePlanNode; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +@SuppressWarnings({"serial"}) +public class PropertyDataSourceTest extends CompilerTestBase { + + private List<Tuple3<Long, SomePojo, String>> tuple3PojoData = new ArrayList<Tuple3<Long, SomePojo, String>>(); + private TupleTypeInfo<Tuple3<Long, SomePojo, String>> tuple3PojoType = new TupleTypeInfo<Tuple3<Long, SomePojo, String>>( + BasicTypeInfo.LONG_TYPE_INFO, + TypeExtractor.createTypeInfo(SomePojo.class), + BasicTypeInfo.STRING_TYPE_INFO + ); + + @Test + public void checkSinglePartitionedSource1() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource2() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(1, 0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource3() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("*"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1, 2, 3, 4))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource4() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1, 2, 3))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource5() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.stringField"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(3))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource6() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.intField; f2"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2, 4))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedSource7() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy("byDate", 1, 0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING); + Assert.assertTrue(gprops.getCustomPartitioner() != null); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedGroupedSource1() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0) + .splitsGroupedBy(0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedGroupedSource2() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0) + .splitsGroupedBy(1, 0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0, 1))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedGroupedSource3() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(1) + .splitsGroupedBy(0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedGroupedSource4() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0, 1) + .splitsGroupedBy(0); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedGroupedSource5() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f2") + .splitsGroupedBy("f2"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(4))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(4))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedGroupedSource6() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.intField") + .splitsGroupedBy("f0; f1.intField"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedGroupedSource7() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.intField") + .splitsGroupedBy("f1"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedGroupedSource8() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1") + .splitsGroupedBy("f1.stringField"); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedOrderedSource1() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(1) + .splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedOrderedSource2() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(1) + .splitsOrderedBy(new int[]{1, 0}, new Order[]{Order.ASCENDING, Order.DESCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue((new FieldSet(lprops.getGroupedFields().toArray())).equals(new FieldSet(1, 0))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedOrderedSource3() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0) + .splitsOrderedBy(new int[]{1}, new Order[]{Order.ASCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedOrderedSource4() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data.getSplitDataProperties() + .splitsPartitionedBy(0, 1) + .splitsOrderedBy(new int[]{1}, new Order[]{Order.DESCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(0, 1))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedOrderedSource5() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.intField") + .splitsOrderedBy("f0; f1.intField", new Order[]{Order.ASCENDING, Order.DESCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(0,2))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkSinglePartitionedOrderedSource6() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1.intField") + .splitsOrderedBy("f1", new Order[]{Order.DESCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(2))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(new FieldSet(lprops.getGroupedFields().toArray()).equals(new FieldSet(1,2,3))); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + @Test + public void checkSinglePartitionedOrderedSource7() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple3<Long, SomePojo, String>> data = env.fromCollection(tuple3PojoData, tuple3PojoType); + + data.getSplitDataProperties() + .splitsPartitionedBy("f1") + .splitsOrderedBy("f1.stringField", new Order[]{Order.ASCENDING}); + + data.print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode = (SourcePlanNode) sinkNode.getPredecessor(); + + GlobalProperties gprops = sourceNode.getGlobalProperties(); + LocalProperties lprops = sourceNode.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops.getPartitioningFields().toArray())).equals(new FieldSet(1,2,3))); + Assert.assertTrue(gprops.getPartitioning() == PartitioningProperty.ANY_PARTITIONING); + Assert.assertTrue(lprops.getGroupedFields() == null); + Assert.assertTrue(lprops.getOrdering() == null); + + } + + + @Test + public void checkCoPartitionedSources1() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data1 = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data1.getSplitDataProperties() + .splitsPartitionedBy("byDate", 0); + + DataSource<Tuple2<Long, String>> data2 = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data2.getSplitDataProperties() + .splitsPartitionedBy("byDate", 0); + + data1.union(data2).print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource(); + SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource(); + + GlobalProperties gprops1 = sourceNode1.getGlobalProperties(); + LocalProperties lprops1 = sourceNode1.getLocalProperties(); + GlobalProperties gprops2 = sourceNode2.getGlobalProperties(); + LocalProperties lprops2 = sourceNode2.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING); + Assert.assertTrue(lprops1.getGroupedFields() == null); + Assert.assertTrue(lprops1.getOrdering() == null); + + Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING); + Assert.assertTrue(lprops2.getGroupedFields() == null); + Assert.assertTrue(lprops2.getOrdering() == null); + + Assert.assertTrue(gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner())); + } + + @Test + public void checkCoPartitionedSources2() { + + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); + env.setDegreeOfParallelism(DEFAULT_PARALLELISM); + + DataSource<Tuple2<Long, String>> data1 = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data1.getSplitDataProperties() + .splitsPartitionedBy("byCountry", 0); + + DataSource<Tuple2<Long, String>> data2 = + env.readCsvFile("/some/path").types(Long.class, String.class); + + data2.getSplitDataProperties() + .splitsPartitionedBy("byDate", 0); + + data1.union(data2).print(); + + JavaPlan plan = env.createProgramPlan(); + + // submit the plan to the compiler + OptimizedPlan oPlan = compileNoStats(plan); + + // check the optimized Plan + SinkPlanNode sinkNode = oPlan.getDataSinks().iterator().next(); + SourcePlanNode sourceNode1 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(0).getSource(); + SourcePlanNode sourceNode2 = (SourcePlanNode) ((NAryUnionPlanNode)sinkNode.getPredecessor()).getListOfInputs().get(1).getSource(); + + GlobalProperties gprops1 = sourceNode1.getGlobalProperties(); + LocalProperties lprops1 = sourceNode1.getLocalProperties(); + GlobalProperties gprops2 = sourceNode2.getGlobalProperties(); + LocalProperties lprops2 = sourceNode2.getLocalProperties(); + + Assert.assertTrue((new FieldSet(gprops1.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING); + Assert.assertTrue(lprops1.getGroupedFields() == null); + Assert.assertTrue(lprops1.getOrdering() == null); + + Assert.assertTrue((new FieldSet(gprops2.getPartitioningFields().toArray())).equals(new FieldSet(0))); + Assert.assertTrue(gprops2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING); + Assert.assertTrue(lprops2.getGroupedFields() == null); + Assert.assertTrue(lprops2.getOrdering() == null); + + Assert.assertTrue(!gprops1.getCustomPartitioner().equals(gprops2.getCustomPartitioner())); + } + + + public static class SomePojo { + public double doubleField; + public int intField; + public String stringField; + } + +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java index 13c5dad..912d13d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; @@ -45,6 +46,8 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O protected String statisticsKey; + private SplitDataProperties splitProperties; + /** * Creates a new instance for the given file using the given input format. * @@ -157,7 +160,30 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O public void setStatisticsKey(String statisticsKey) { this.statisticsKey = statisticsKey; } - + + /** + * Sets properties of input splits for this data source. + * Split properties can help to generate more efficient execution plans. + * <br> + * <b> + * IMPORTANT: Providing wrong split data properties can cause wrong results! + * </b> + * + * @param splitDataProperties The data properties of this data source's splits. + */ + public void setSplitDataProperties(SplitDataProperties<OUT> splitDataProperties) { + this.splitProperties = splitDataProperties; + } + + /** + * Returns the data properties of this data source's splits. + * + * @return The data properties of this data source's splits or null if no properties have been set. + */ + public SplitDataProperties<OUT> getSplitDataProperties() { + return this.splitProperties; + } + // -------------------------------------------------------------------------------------------- /** @@ -209,4 +235,17 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O public String toString() { return this.name; } + + + public static interface SplitDataProperties<T> { + + public int[] getSplitPartitionKeys(); + + public Partitioner<T> getSplitPartitioner(); + + public int[] getSplitGroupKeys(); + + public Ordering getSplitOrder(); + + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java new file mode 100644 index 0000000..04d9953 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.operators.GenericDataSourceBase; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.operators.Ordering; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.operators.Keys; + +import java.util.Arrays; + +/** + * SplitDataProperties define data properties on {@link org.apache.flink.core.io.InputSplit} + * generated by the {@link org.apache.flink.api.common.io.InputFormat} of a {@link DataSource}. + * + * InputSplits are units of input which are distributed among and assigned to parallel data source subtasks. + * SplitDataProperties can define that the elements which are generated by the associated InputFormat + * are + * <ul> + * <li>Partitioned on one or more fields across InputSplits, i.e., all elements with the same + * (combination of) key(s) are located in the same input split.</li> + * <li>Grouped on one or more fields within an InputSplit, i.e., all elements of an input split + * that have the same (combination of) key(s) are emitted in a single sequence one after the other.</li> + * <li>Ordered on one or more fields within an InputSplit, i.e., all elements within an input split + * are in the defined order.</li> + * </ul> + * + * <b>IMPORTANT: SplitDataProperties can improve the execution of a program because certain + * data reorganization steps such as shuffling or sorting can be avoided. + * HOWEVER, if SplitDataProperties are not correctly defined, the result of the program might be wrong!</b> + * + * @param <T> The type of the DataSource on which the SplitDataProperties are defined. + * + * @see org.apache.flink.core.io.InputSplit + * @see org.apache.flink.api.common.io.InputFormat + * @see org.apache.flink.api.java.operators.DataSource + */ +public class SplitDataProperties<T> implements GenericDataSourceBase.SplitDataProperties<T> { + + private TypeInformation<T> type; + + private int[] splitPartitionKeys; + + private Partitioner<T> splitPartitioner; + + private int[] splitGroupKeys; + + private Ordering splitOrdering; + + /** + * Creates SplitDataProperties for the given data types. + * + * @param type The data type of the SplitDataProperties. + */ + public SplitDataProperties(TypeInformation<T> type) { + this.type = type; + } + + /** + * Creates SplitDataProperties for the given data types. + * + * @param source The DataSource for which the SplitDataProperties are created. + */ + public SplitDataProperties(DataSource<T> source) { + this.type = source.getType(); + } + + /** + * Defines that data is partitioned across input splits on the fields defined by field positions. + * All records sharing the same key (combination) must be contained in a single input split. + * <br> + * <b> + * IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! + * </b> + * + * @param partitionFields The field positions of the partitioning keys. + * @result This SplitDataProperties object. + */ + public SplitDataProperties<T> splitsPartitionedBy(int... partitionFields) { + return this.splitsPartitionedBy(null, partitionFields); + } + + /** + * Defines that data is partitioned using a specific partitioning method + * across input splits on the fields defined by field positions. + * All records sharing the same key (combination) must be contained in a single input split. + * <br> + * <b> + * IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! + * </b> + * + * @param partitionMethodId An ID for the method that was used to partition the data across splits. + * @param partitionFields The field positions of the partitioning keys. + * @result This SplitDataProperties object. + */ + public SplitDataProperties<T> splitsPartitionedBy(String partitionMethodId, int... partitionFields) { + + if (partitionFields == null) { + throw new InvalidProgramException("PartitionFields may not be null."); + } else if (partitionFields.length == 0) { + throw new InvalidProgramException("PartitionFields may not be empty."); + } + + this.splitPartitionKeys = getAllFlatKeys(partitionFields); + if (partitionMethodId != null) { + this.splitPartitioner = new SourcePartitionerMarker<T>(partitionMethodId); + } else { + this.splitPartitioner = null; + } + + return this; + } + + /** + * Defines that data is partitioned across input splits on the fields defined by field expressions. + * Multiple field expressions must be separated by the semicolon ';' character. + * All records sharing the same key (combination) must be contained in a single input split. + * <br> + * <b> + * IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! + * </b> + * + * @param partitionFields The field expressions of the partitioning keys. + * @result This SplitDataProperties object. + */ + public SplitDataProperties<T> splitsPartitionedBy(String partitionFields) { + return this.splitsPartitionedBy(null, partitionFields); + } + + /** + * Defines that data is partitioned using an identifiable method + * across input splits on the fields defined by field expressions. + * Multiple field expressions must be separated by the semicolon ';' character. + * All records sharing the same key (combination) must be contained in a single input split. + * <br> + * <b> + * IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! + * </b> + * + * @param partitionMethodId An ID for the method that was used to partition the data across splits. + * @param partitionFields The field expressions of the partitioning keys. + * @result This SplitDataProperties object. + */ + public SplitDataProperties<T> splitsPartitionedBy(String partitionMethodId, String partitionFields) { + + if(partitionFields == null) { + throw new InvalidProgramException("PartitionFields may not be null."); + } + + String[] partitionKeysA = partitionFields.split(";"); + if (partitionKeysA.length == 0) { + throw new InvalidProgramException("PartitionFields may not be empty."); + } + + this.splitPartitionKeys = getAllFlatKeys(partitionKeysA); + if(partitionMethodId != null) { + this.splitPartitioner = new SourcePartitionerMarker<T>(partitionMethodId); + } + else { + this.splitPartitioner = null; + } + + return this; + } + + /** + * Defines that the data within an input split is grouped on the fields defined by the field positions. + * All records sharing the same key (combination) must be subsequently emitted by the input + * format for each input split. + * <br> + * <b> + * IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! + * </b> + * + * @param groupFields The field positions of the grouping keys. + * @result This SplitDataProperties object. + */ + public SplitDataProperties<T> splitsGroupedBy(int... groupFields) { + + if(groupFields == null) { + throw new InvalidProgramException("GroupFields may not be null."); + } else if (groupFields.length == 0) { + throw new InvalidProgramException("GroupFields may not be empty."); + } + + if(this.splitOrdering != null) { + throw new InvalidProgramException("DataSource may either be grouped or sorted."); + } + + this.splitGroupKeys = getAllFlatKeys(groupFields); + + return this; + } + + /** + * Defines that the data within an input split is grouped on the fields defined by the field expressions. + * Multiple field expressions must be separated by the semicolon ';' character. + * All records sharing the same key (combination) must be subsequently emitted by the input + * format for each input split. + * <br> + * <b> + * IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! + * </b> + * + * @param groupFields The field expressions of the grouping keys. + * @result This SplitDataProperties object. + */ + public SplitDataProperties<T> splitsGroupedBy(String groupFields) { + + if(groupFields == null) { + throw new InvalidProgramException("GroupFields may not be null."); + } + + String[] groupKeysA = groupFields.split(";"); + if (groupKeysA.length == 0) { + throw new InvalidProgramException("GroupFields may not be empty."); + } + + if(this.splitOrdering != null) { + throw new InvalidProgramException("DataSource may either be grouped or sorted."); + } + + this.splitGroupKeys = getAllFlatKeys(groupKeysA); + + return this; + } + + /** + * Defines that the data within an input split is sorted on the fields defined by the field positions + * in the specified orders. + * All records of an input split must be emitted by the input format in the defined order. + * <br> + * <b> + * IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! + * </b> + * + * @param orderFields The field positions of the grouping keys. + * @param orders The orders of the fields. + * @result This SplitDataProperties object. + */ + public SplitDataProperties<T> splitsOrderedBy(int[] orderFields, Order[] orders) { + + if(orderFields == null || orders == null) { + throw new InvalidProgramException("OrderFields or Orders may not be null."); + } else if (orderFields.length == 0) { + throw new InvalidProgramException("OrderFields may not be empty."); + } else if (orders.length == 0) { + throw new InvalidProgramException("Orders may not be empty"); + } else if (orderFields.length != orders.length) { + throw new InvalidProgramException("Number of OrderFields and Orders must match."); + } + + if(this.splitGroupKeys != null) { + throw new InvalidProgramException("DataSource may either be grouped or sorted."); + } + + this.splitOrdering = new Ordering(); + + for(int i=0; i<orderFields.length; i++) { + int pos = orderFields[i]; + int[] flatKeys = this.getAllFlatKeys(new int[]{pos}); + + for(int key : flatKeys) { + // check for duplicates + for (int okey : splitOrdering.getFieldPositions()) { + if (key == okey) { + throw new InvalidProgramException("Duplicate field in the field expression " + pos); + } + } + // append key + this.splitOrdering.appendOrdering(key, null, orders[i] ); + } + } + return this; + } + + /** + * Defines that the data within an input split is sorted on the fields defined by the field expressions + * in the specified orders. Multiple field expressions must be separated by the semicolon ';' character. + * All records of an input split must be emitted by the input format in the defined order. + * <br> + * <b> + * IMPORTANT: Providing wrong information with SplitDataProperties can cause wrong results! + * </b> + * + * @param orderFields The field expressions of the grouping key. + * @param orders The orders of the fields. + * @result This SplitDataProperties object. + */ + public SplitDataProperties<T> splitsOrderedBy(String orderFields, Order[] orders) { + + if(orderFields == null || orders == null) { + throw new InvalidProgramException("OrderFields or Orders may not be null."); + } + + String[] orderKeysA = orderFields.split(";"); + if (orderKeysA.length == 0) { + throw new InvalidProgramException("OrderFields may not be empty."); + } else if (orders.length == 0) { + throw new InvalidProgramException("Orders may not be empty"); + } else if (orderKeysA.length != orders.length) { + throw new InvalidProgramException("Number of OrderFields and Orders must match."); + } + + if(this.splitGroupKeys != null) { + throw new InvalidProgramException("DataSource may either be grouped or sorted."); + } + + this.splitOrdering = new Ordering(); + + for(int i=0; i<orderKeysA.length; i++) { + String keyExp = orderKeysA[i]; + int[] flatKeys = this.computeFlatKeys(keyExp); + + for(int key : flatKeys) { + // check for duplicates + for (int okey : splitOrdering.getFieldPositions()) { + if (key == okey) { + throw new InvalidProgramException("Duplicate field in field expression " + keyExp); + } + } + // append key + this.splitOrdering.appendOrdering(key, null, orders[i] ); + } + } + return this; + } + + public int[] getSplitPartitionKeys() { + return this.splitPartitionKeys; + } + + public Partitioner<T> getSplitPartitioner() { + return this.splitPartitioner; + } + + public int[] getSplitGroupKeys() { + return this.splitGroupKeys; + } + + public Ordering getSplitOrder() { + return this.splitOrdering; + } + + + /////////////////////// FLAT FIELD EXTRACTION METHODS + + private int[] getAllFlatKeys(String[] fieldExpressions) { + + int[] allKeys = null; + + for(String keyExp : fieldExpressions) { + int[] flatKeys = this.computeFlatKeys(keyExp); + if(allKeys == null) { + allKeys = flatKeys; + } else { + // check for duplicates + for(int key1 : flatKeys) { + for(int key2 : allKeys) { + if(key1 == key2) { + throw new InvalidProgramException("Duplicate fields in field expression "+keyExp); + } + } + } + // append flat keys + int oldLength = allKeys.length; + int newLength = oldLength + flatKeys.length; + allKeys = Arrays.copyOf(allKeys, newLength); + for(int i=0;i<flatKeys.length; i++) { + allKeys[oldLength+i] = flatKeys[i]; + } + } + } + + return allKeys; + } + + private int[] getAllFlatKeys(int[] fieldPositions) { + + Keys.ExpressionKeys<T> ek; + try { + ek = new Keys.ExpressionKeys<T>(fieldPositions, this.type); + } catch(IllegalArgumentException iae) { + throw new InvalidProgramException("Invalid specification of field expression.", iae); + } + return ek.computeLogicalKeyPositions(); + } + + + private int[] computeFlatKeys(String fieldExpression) { + + fieldExpression = fieldExpression.trim(); + + if(this.type instanceof CompositeType) { + // compute flat field positions for (nested) sorting fields + Keys.ExpressionKeys<T> ek; + try { + ek = new Keys.ExpressionKeys<T>(new String[]{fieldExpression}, this.type); + } catch(IllegalArgumentException iae) { + throw new InvalidProgramException("Invalid specification of field expression.", iae); + } + return ek.computeLogicalKeyPositions(); + } else { + fieldExpression = fieldExpression.trim(); + if (!(fieldExpression.equals("*") || fieldExpression.equals("_"))) { + throw new InvalidProgramException("Data properties on non-composite types can only be defined on the full type. " + + "Use a field wildcard for that (\"*\" or \"_\")"); + } else { + return new int[]{0}; + } + } + } + + /** + * A custom partitioner to mark compatible split partitionings. + * + * @param <T> The type of the partitioned data. + */ + public static class SourcePartitionerMarker<T> implements Partitioner<T> { + + String partitionMarker; + + public SourcePartitionerMarker(String partitionMarker) { + this.partitionMarker = partitionMarker; + } + + @Override + public int partition(T key, int numPartitions) { + throw new UnsupportedOperationException("The SourcePartitionerMarker is only used as a marker for compatible partitioning. " + + "It must not be invoked."); + } + + @Override + public boolean equals(Object o) { + if(o instanceof SourcePartitionerMarker) { + return this.partitionMarker.equals(((SourcePartitionerMarker) o).partitionMarker); + } else { + return false; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/f0a28bf5/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java index 2352269..d6e511a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.api.common.operators.OperatorInformation; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.SplitDataProperties; import org.apache.flink.configuration.Configuration; /** @@ -41,6 +42,8 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> { private Configuration parameters; + private SplitDataProperties<OUT> splitDataProperties; + // -------------------------------------------------------------------------------------------- /** @@ -90,7 +93,28 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> { public Configuration getParameters() { return this.parameters; } - + + + /** + * Returns the {@link org.apache.flink.api.java.io.SplitDataProperties} for the + * {@link org.apache.flink.core.io.InputSplit}s of this DataSource + * for configurations. + * + * SplitDataProperties can help to generate more efficient execution plans. + * <br> + * <b> + * IMPORTANT: Incorrect configuration of SplitDataProperties can cause wrong results! + * </b> + * + * @return The SplitDataProperties for the InputSplits of this DataSource. + */ + public SplitDataProperties<OUT> getSplitDataProperties() { + if(this.splitDataProperties == null) { + this.splitDataProperties = new SplitDataProperties<OUT>(this); + } + return this.splitDataProperties; + } + // -------------------------------------------------------------------------------------------- protected GenericDataSourceBase<OUT, ?> translateToDataFlow() { @@ -106,6 +130,10 @@ public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> { if(this.parameters != null) { source.getParameters().addAll(this.parameters); } + if(this.splitDataProperties != null) { + source.setSplitDataProperties(this.splitDataProperties); + } return source; } + }
