Repository: flink Updated Branches: refs/heads/master 8bee54395 -> 53d6582d3
http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index 2ad133c..ccd6139 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -38,23 +38,27 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.util.Preconditions; +/** + * A {@link Grouping} that is unsorted. + * @param <T> type of elements + */ @Public public class UnsortedGrouping<T> extends Grouping<T> { public UnsortedGrouping(DataSet<T> set, Keys<T> keys) { super(set, keys); } - + /** * Uses a custom partitioner for the grouping. - * + * * @param partitioner The custom partitioner. * @return The grouping object itself, to allow for method chaining. */ public UnsortedGrouping<T> withPartitioner(Partitioner<?> partitioner) { Preconditions.checkNotNull(partitioner); getKeys().validateCustomPartitioner(partitioner, null); - + this.customPartitioner = partitioner; return this; } @@ -62,18 +66,19 @@ public class UnsortedGrouping<T> extends Grouping<T> { // -------------------------------------------------------------------------------------------- // Operations / Transformations // -------------------------------------------------------------------------------------------- - + /** - * Applies an Aggregate transformation on a grouped {@link org.apache.flink.api.java.tuple.Tuple} {@link DataSet}.<br> - * <b>Note: Only Tuple DataSets can be aggregated.</b> - * The transformation applies a built-in {@link Aggregations Aggregation} on a specified field - * of a Tuple group. Additional aggregation functions can be added to the resulting + * Applies an Aggregate transformation on a grouped {@link org.apache.flink.api.java.tuple.Tuple} {@link DataSet}. + * + * <p><b>Note: Only Tuple DataSets can be aggregated.</b> + * The transformation applies a built-in {@link Aggregations Aggregation} on a specified field + * of a Tuple group. Additional aggregation functions can be added to the resulting * {@link AggregateOperator} by calling {@link AggregateOperator#and(Aggregations, int)}. - * + * * @param agg The built-in aggregation function that is computed. * @param field The index of the Tuple field on which the aggregation function is applied. - * @return An AggregateOperator that represents the aggregated DataSet. - * + * @return An AggregateOperator that represents the aggregated DataSet. + * * @see org.apache.flink.api.java.tuple.Tuple * @see Aggregations * @see AggregateOperator @@ -82,14 +87,14 @@ public class UnsortedGrouping<T> extends Grouping<T> { public AggregateOperator<T> aggregate(Aggregations agg, int field) { return aggregate(agg, field, Utils.getCallLocationName()); } - + // private helper that allows to set a different call location name private AggregateOperator<T> aggregate(Aggregations agg, int field, String callLocationName) { return new AggregateOperator<T>(this, agg, field, callLocationName); } /** - * Syntactic sugar for aggregate (SUM, field) + * Syntactic sugar for aggregate (SUM, field). * @param field The index of the Tuple field on which the aggregation function is applied. * @return An AggregateOperator that represents the summed DataSet. * @@ -100,7 +105,7 @@ public class UnsortedGrouping<T> extends Grouping<T> { } /** - * Syntactic sugar for aggregate (MAX, field) + * Syntactic sugar for aggregate (MAX, field). * @param field The index of the Tuple field on which the aggregation function is applied. * @return An AggregateOperator that represents the max'ed DataSet. * @@ -111,7 +116,7 @@ public class UnsortedGrouping<T> extends Grouping<T> { } /** - * Syntactic sugar for aggregate (MIN, field) + * Syntactic sugar for aggregate (MIN, field). * @param field The index of the Tuple field on which the aggregation function is applied. * @return An AggregateOperator that represents the min'ed DataSet. * @@ -120,16 +125,17 @@ public class UnsortedGrouping<T> extends Grouping<T> { public AggregateOperator<T> min (int field) { return this.aggregate (Aggregations.MIN, field, Utils.getCallLocationName()); } - + /** - * Applies a Reduce transformation on a grouped {@link DataSet}.<br> - * For each group, the transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction} - * until only a single element for each group remains. + * Applies a Reduce transformation on a grouped {@link DataSet}. + * + * <p>For each group, the transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction} + * until only a single element for each group remains. * A ReduceFunction combines two elements into one new element of the same type. - * + * * @param reducer The ReduceFunction that is applied on each group of the DataSet. * @return A ReduceOperator that represents the reduced DataSet. - * + * * @see org.apache.flink.api.common.functions.RichReduceFunction * @see ReduceOperator * @see DataSet @@ -140,16 +146,17 @@ public class UnsortedGrouping<T> extends Grouping<T> { } return new ReduceOperator<T>(this, inputDataSet.clean(reducer), Utils.getCallLocationName()); } - + /** - * Applies a GroupReduce transformation on a grouped {@link DataSet}.<br> - * The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} for each group of the DataSet. + * Applies a GroupReduce transformation on a grouped {@link DataSet}. + * + * <p>The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} for each group of the DataSet. * A GroupReduceFunction can iterate over all elements of a group and emit any * number of output elements including none. - * + * * @param reducer The GroupReduceFunction that is applied on each group of the DataSet. * @return A GroupReduceOperator that represents the reduced DataSet. - * + * * @see org.apache.flink.api.common.functions.RichGroupReduceFunction * @see GroupReduceOperator * @see DataSet @@ -187,72 +194,77 @@ public class UnsortedGrouping<T> extends Grouping<T> { } /** - * Returns a new set containing the first n elements in this grouped {@link DataSet}.<br> + * Returns a new set containing the first n elements in this grouped {@link DataSet}. + * * @param n The desired number of elements for each group. * @return A GroupReduceOperator that represents the DataSet containing the elements. */ public GroupReduceOperator<T, T> first(int n) { - if(n < 1) { + if (n < 1) { throw new InvalidProgramException("Parameter n of first(n) must be at least 1."); } - + return reduceGroup(new FirstReducer<T>(n)); } /** - * Applies a special case of a reduce transformation (minBy) on a grouped {@link DataSet}.<br> - * The transformation consecutively calls a {@link ReduceFunction} + * Applies a special case of a reduce transformation (minBy) on a grouped {@link DataSet}. + * + * <p>The transformation consecutively calls a {@link ReduceFunction} * until only a single element remains which is the result of the transformation. * A ReduceFunction combines two elements into one new element of the same type. - * + * * @param fields Keys taken into account for finding the minimum. * @return A {@link ReduceOperator} representing the minimum. */ @SuppressWarnings({ "unchecked", "rawtypes" }) public ReduceOperator<T> minBy(int... fields) { - + // Check for using a tuple - if(!this.inputDataSet.getType().isTupleType()) { + if (!this.inputDataSet.getType().isTupleType()) { throw new InvalidProgramException("Method minBy(int) only works on tuples."); } - + return new ReduceOperator<T>(this, new SelectByMinFunction( (TupleTypeInfo) this.inputDataSet.getType(), fields), Utils.getCallLocationName()); } - + /** - * Applies a special case of a reduce transformation (maxBy) on a grouped {@link DataSet}.<br> - * The transformation consecutively calls a {@link ReduceFunction} + * Applies a special case of a reduce transformation (maxBy) on a grouped {@link DataSet}. + * + * <p>The transformation consecutively calls a {@link ReduceFunction} * until only a single element remains which is the result of the transformation. * A ReduceFunction combines two elements into one new element of the same type. - * + * * @param fields Keys taken into account for finding the minimum. * @return A {@link ReduceOperator} representing the minimum. */ @SuppressWarnings({ "unchecked", "rawtypes" }) public ReduceOperator<T> maxBy(int... fields) { - + // Check for using a tuple - if(!this.inputDataSet.getType().isTupleType()) { + if (!this.inputDataSet.getType().isTupleType()) { throw new InvalidProgramException("Method maxBy(int) only works on tuples."); } - + return new ReduceOperator<T>(this, new SelectByMaxFunction( (TupleTypeInfo) this.inputDataSet.getType(), fields), Utils.getCallLocationName()); } // -------------------------------------------------------------------------------------------- // Group Operations // -------------------------------------------------------------------------------------------- - + /** - * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group on the specified field in the specified {@link Order}.<br> - * <b>Note: Only groups of Tuple elements and Pojos can be sorted.</b><br> - * Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls. - * + * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group on the specified field in the specified {@link Order}. + * + * <p><b>Note: Only groups of Tuple elements and Pojos can be sorted.</b> + * + * <p>Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls. + * * @param field The Tuple field on which the group is sorted. * @param order The Order in which the specified Tuple field is sorted. * @return A SortedGrouping with specified order of group element. - * + * * @see org.apache.flink.api.java.tuple.Tuple * @see Order */ @@ -265,16 +277,18 @@ public class UnsortedGrouping<T> extends Grouping<T> { sg.customPartitioner = getCustomPartitioner(); return sg; } - + /** - * Sorts Pojos within a group on the specified field in the specified {@link Order}.<br> - * <b>Note: Only groups of Tuple elements and Pojos can be sorted.</b><br> - * Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)} calls. - * + * Sorts Pojos within a group on the specified field in the specified {@link Order}. + * + * <p><b>Note: Only groups of Tuple elements and Pojos can be sorted.</b> + * + * <p>Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)} calls. + * * @param field The Tuple or Pojo field on which the group is sorted. * @param order The Order in which the specified field is sorted. * @return A SortedGrouping with specified order of group element. - * + * * @see Order */ public SortedGrouping<T> sortGroup(String field, Order order) { @@ -289,8 +303,9 @@ public class UnsortedGrouping<T> extends Grouping<T> { /** * Sorts elements within a group on a key extracted by the specified {@link org.apache.flink.api.java.functions.KeySelector} - * in the specified {@link Order}.<br> - * Chaining {@link #sortGroup(KeySelector, Order)} calls is not supported. + * in the specified {@link Order}. + * + * <p>Chaining {@link #sortGroup(KeySelector, Order)} calls is not supported. * * @param keySelector The KeySelector with which the group is sorted. * @param order The Order in which the extracted key is sorted. @@ -308,5 +323,5 @@ public class UnsortedGrouping<T> extends Grouping<T> { sg.customPartitioner = getCustomPartitioner(); return sg; } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java index a474783..770e0e8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinOperatorSetsBase.java @@ -22,6 +22,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.JoinFunction; +import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; @@ -29,13 +30,13 @@ import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin; import org.apache.flink.api.java.operators.JoinOperator.EquiJoin; -import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TypeExtractor; /** - * Intermediate step of an Outer Join transformation. <br> - * To continue the Join transformation, select the join key of the first input {@link DataSet} by calling + * Intermediate step of an Outer Join transformation. + * + * <p>To continue the Join transformation, select the join key of the first input {@link DataSet} by calling * {@link JoinOperatorSetsBase#where(int...)} or * {@link JoinOperatorSetsBase#where(KeySelector)}. * @@ -71,9 +72,11 @@ public class JoinOperatorSetsBase<I1, I2> { } /** - * Continues a Join transformation. <br> - * Defines the {@link Tuple} fields of the first join {@link DataSet} that should be used as join keys.<br> - * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br> + * Continues a Join transformation. + * + * <p>Defines the {@link Tuple} fields of the first join {@link DataSet} that should be used as join keys. + * + * <p><b>Note: Fields can only be selected as join keys on Tuple DataSets.</b> * * @param fields The indexes of the other Tuple fields of the first join DataSets that should be used as keys. * @return An incomplete Join transformation. @@ -89,8 +92,9 @@ public class JoinOperatorSetsBase<I1, I2> { } /** - * Continues a Join transformation. <br> - * Defines the fields of the first join {@link DataSet} that should be used as grouping keys. Fields + * Continues a Join transformation. + * + * <p>Defines the fields of the first join {@link DataSet} that should be used as grouping keys. Fields * are the names of member fields of the underlying type of the data set. * * @param fields The fields of the first join DataSets that should be used as keys. @@ -107,9 +111,10 @@ public class JoinOperatorSetsBase<I1, I2> { } /** - * Continues a Join transformation and defines a {@link KeySelector} function for the first join {@link DataSet}.<br> - * The KeySelector function is called for each element of the first DataSet and extracts a single - * key value on which the DataSet is joined. <br> + * Continues a Join transformation and defines a {@link KeySelector} function for the first join {@link DataSet}. + * + * <p>The KeySelector function is called for each element of the first DataSet and extracts a single + * key value on which the DataSet is joined. * * @param keySelector The KeySelector function which extracts the key values from the DataSet on which it is joined. * @return An incomplete Join transformation. @@ -125,10 +130,10 @@ public class JoinOperatorSetsBase<I1, I2> { return new JoinOperatorSetsPredicateBase(new Keys.SelectorFunctionKeys<>(keySelector, input1.getType(), keyType)); } - /** - * Intermediate step of a Join transformation. <br> - * To continue the Join transformation, select the join key of the second input {@link DataSet} by calling + * Intermediate step of a Join transformation. + * + * <p>To continue the Join transformation, select the join key of the second input {@link DataSet} by calling * {@link org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.JoinOperatorSetsPredicateBase#equalTo(int...)} or * {@link org.apache.flink.api.java.operators.join.JoinOperatorSetsBase.JoinOperatorSetsPredicateBase#equalTo(KeySelector)}. * @@ -151,10 +156,11 @@ public class JoinOperatorSetsBase<I1, I2> { /** * Continues a Join transformation and defines the {@link Tuple} fields of the second join - * {@link DataSet} that should be used as join keys.<br> - * <b>Note: Fields can only be selected as join keys on Tuple DataSets.</b><br> + * {@link DataSet} that should be used as join keys. * - * The resulting {@link JoinFunctionAssigner} needs to be finished by providing a + * <p><b>Note: Fields can only be selected as join keys on Tuple DataSets.</b> + * + * <p>The resulting {@link JoinFunctionAssigner} needs to be finished by providing a * {@link JoinFunction} by calling {@link JoinFunctionAssigner#with(JoinFunction)} * * @param fields The indexes of the Tuple fields of the second join DataSet that should be used as keys. @@ -166,9 +172,9 @@ public class JoinOperatorSetsBase<I1, I2> { /** * Continues a Join transformation and defines the fields of the second join - * {@link DataSet} that should be used as join keys.<br> + * {@link DataSet} that should be used as join keys. * - * The resulting {@link JoinFunctionAssigner} needs to be finished by providing a + * <p>The resulting {@link JoinFunctionAssigner} needs to be finished by providing a * {@link JoinFunction} by calling {@link JoinFunctionAssigner#with(JoinFunction)} * * @param fields The fields of the second join DataSet that should be used as keys. @@ -179,11 +185,12 @@ public class JoinOperatorSetsBase<I1, I2> { } /** - * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}.<br> - * The KeySelector function is called for each element of the second DataSet and extracts a single - * key value on which the DataSet is joined. <br> + * Continues a Join transformation and defines a {@link KeySelector} function for the second join {@link DataSet}. + * + * <p>The KeySelector function is called for each element of the second DataSet and extracts a single + * key value on which the DataSet is joined. * - * The resulting {@link JoinFunctionAssigner} needs to be finished by providing a + * <p>The resulting {@link JoinFunctionAssigner} needs to be finished by providing a * {@link JoinFunction} by calling {@link JoinFunctionAssigner#with(JoinFunction)} * * @param keySelector The KeySelector function which extracts the key values from the second DataSet on which it is joined. @@ -211,7 +218,7 @@ public class JoinOperatorSetsBase<I1, I2> { try { keys1.areCompatible(keys2); } catch (Keys.IncompatibleKeysException e) { - throw new InvalidProgramException("The pair of join keys are not compatible with each other.",e); + throw new InvalidProgramException("The pair of join keys are not compatible with each other.", e); } return new DefaultJoin<>(input1, input2, keys1, keys2, joinHint, Utils.getCallLocationName(4), joinType); } http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java index 58866de..2a668a8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/join/JoinType.java @@ -20,6 +20,9 @@ package org.apache.flink.api.java.operators.join; import org.apache.flink.annotation.Public; +/** + * Join types. + */ @Public public enum JoinType { http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java index 09a705c..5597b8f 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/NamesTest.java @@ -18,11 +18,6 @@ package org.apache.flink.api.java.operators; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; @@ -35,9 +30,15 @@ import org.apache.flink.api.java.operators.translation.PlanFilterOperator; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.util.Collector; import org.apache.flink.util.Visitor; + import org.junit.Assert; import org.junit.Test; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + /** * Test proper automated assignment of the transformation's name, if not set by the user. */ @@ -50,7 +51,6 @@ public class NamesTest implements Serializable { DataSet<String> strs = env.fromCollection(Arrays.asList("a", "b")); - // WARNING: The test will fail if this line is being moved down in the file (the line-number is hard-coded) strs.filter(new FilterFunction<String>() { private static final long serialVersionUID = 1L; @@ -92,8 +92,7 @@ public class NamesTest implements Serializable { DataSet<Tuple1<String>> strs1 = env.fromCollection(strLi); strs.join(strs1).where(0).equalTo(0).with(new FlatJoinFunction<Tuple1<String>, Tuple1<String>, String>() { @Override - public void join(Tuple1<String> first, Tuple1<String> second, - Collector<String> out) throws Exception { + public void join(Tuple1<String> first, Tuple1<String> second, Collector<String> out) throws Exception { // } }) @@ -102,11 +101,12 @@ public class NamesTest implements Serializable { plan.accept(new Visitor<Operator<?>>() { @Override public boolean preVisit(Operator<?> visitable) { - if(visitable instanceof InnerJoinOperatorBase) { + if (visitable instanceof InnerJoinOperatorBase) { Assert.assertEquals("Join at testJoinWith(NamesTest.java:93)", visitable.getName()); } return true; } + @Override public void postVisit(Operator<?> visitable) {} }); @@ -116,7 +116,7 @@ public class NamesTest implements Serializable { plan.accept(new Visitor<Operator<?>>() { @Override public boolean preVisit(Operator<?> visitable) { - if(visitable instanceof PlanFilterOperator<?>) { + if (visitable instanceof PlanFilterOperator<?>) { // cast is actually not required. Its just a check for the right element PlanFilterOperator<?> filterOp = (PlanFilterOperator<?>) visitable; Assert.assertEquals(expected, filterOp.getName()); http://git-wip-us.apache.org/repos/asf/flink/blob/34e82f9d/tools/maven/suppressions-java.xml ---------------------------------------------------------------------- diff --git a/tools/maven/suppressions-java.xml b/tools/maven/suppressions-java.xml index 3b7d60b..9ddacba 100644 --- a/tools/maven/suppressions-java.xml +++ b/tools/maven/suppressions-java.xml @@ -60,10 +60,6 @@ under the License. checks="AvoidStarImport"/> <suppress - files="(.*)api[/\\]java[/\\]operators[/\\]([^/\\]*\.java)" - checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/> - - <suppress files="(.*)api[/\\]java[/\\]operator[/\\]([^/\\]*\.java)" checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>