[FLINK-4728] [core,optimizer] Replace reference equality with object equality

Some cases of testing Integer equality using == rather than
Integer.equals(Integer), and some additional cleanup.

This closes #2582


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9206b483
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9206b483
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9206b483

Branch: refs/heads/master
Commit: 9206b483b68bb41195bdf2da4f0b9c2de517c031
Parents: 10a42f9
Author: Greg Hogan <[email protected]>
Authored: Mon Oct 3 13:59:57 2016 -0400
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 5 19:36:13 2016 +0200

----------------------------------------------------------------------
 .../flink/api/common/operators/Ordering.java    |  6 ++---
 .../api/common/operators/util/FieldList.java    |  6 ++---
 .../api/common/io/DelimitedInputFormatTest.java |  1 -
 .../api/common/io/EnumerateNestedFilesTest.java |  1 -
 .../api/common/io/FileInputFormatTest.java      |  2 --
 .../base/OuterJoinOperatorBaseTest.java         | 13 ++++-----
 .../typeutils/base/BigDecComparatorTest.java    |  2 +-
 .../typeutils/base/BigDecSerializerTest.java    |  1 -
 .../typeutils/base/DoubleSerializerTest.java    |  9 +++----
 .../typeutils/base/SqlDateComparatorTest.java   |  2 +-
 .../api/java/typeutils/EitherTypeInfoTest.java  |  2 +-
 .../typeutils/runtime/EitherSerializerTest.java |  2 +-
 .../runtime/TupleComparatorTTT1Test.java        |  5 ++--
 .../apache/flink/types/CopyableValueTest.java   |  6 ++---
 .../java/org/apache/flink/types/RecordTest.java |  2 +-
 .../api/java/functions/FunctionAnnotation.java  | 12 ++++-----
 .../api/java/operators/PartitionOperator.java   |  2 +-
 .../apache/flink/api/java/sca/TaggedValue.java  |  4 +--
 .../java/functions/SemanticPropUtilTest.java    |  7 +++--
 .../api/java/operator/CrossOperatorTest.java    | 16 +++++------
 .../api/java/operator/DistinctOperatorTest.java |  4 +--
 .../flink/api/java/operator/GroupingTest.java   |  4 +--
 .../api/java/operator/JoinOperatorTest.java     | 21 +++++++--------
 .../api/java/operator/MaxByOperatorTest.java    |  2 +-
 .../api/java/operator/MinByOperatorTest.java    |  2 +-
 .../api/java/operator/ReduceOperatorTest.java   |  3 ---
 .../api/java/operator/SortPartitionTest.java    |  4 +--
 .../translation/DistinctTranslationTest.java    |  2 +-
 .../api/java/sca/UdfAnalyzerExamplesTest.java   |  2 +-
 .../flink/api/java/sca/UdfAnalyzerTest.java     |  2 +-
 .../flink/optimizer/dag/BulkIterationNode.java  | 18 ++++++-------
 .../optimizer/dag/WorksetIterationNode.java     |  2 +-
 .../dataproperties/GlobalProperties.java        |  8 +++---
 .../apache/flink/optimizer/plan/PlanNode.java   | 16 +++++------
 .../plandump/PlanJSONDumpGenerator.java         | 28 ++++++++++----------
 .../optimizer/AdditionalOperatorsTest.java      |  2 +-
 .../optimizer/BranchingPlansCompilerTest.java   |  2 +-
 .../optimizer/PartitioningReusageTest.java      |  2 +-
 .../SemanticPropertiesAPIToPlanTest.java        |  2 +-
 .../optimizer/dag/GroupCombineNodeTest.java     |  1 -
 .../GlobalPropertiesMatchingTest.java           | 14 +++++-----
 41 files changed, 116 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
index 7332698..afc659a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Ordering.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.common.operators;
 
-import java.util.ArrayList;
-
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.operators.util.FieldSet;
 
+import java.util.ArrayList;
+
 /**
  * This class represents an ordering on a set of fields. It specifies the 
fields and order direction
  * (ascending, descending).
@@ -145,7 +145,7 @@ public class Ordering implements Cloneable {
                }
                
                for (int i = 0; i < this.indexes.size(); i++) {
-                       if (this.indexes.get(i) != 
otherOrdering.indexes.get(i)) {
+                       if 
(!this.indexes.get(i).equals(otherOrdering.indexes.get(i))) {
                                return false;
                        }
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
index 15a993c..4cbde56 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/util/FieldList.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.api.common.operators.util;
 
+import org.apache.flink.annotation.Internal;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.flink.annotation.Internal;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -166,7 +166,7 @@ public class FieldList extends FieldSet {
                        return false;
                } else {
                        for (int i = 0; i < this.size(); i++) {
-                               if (this.get(i) != list.get(i)) {
+                               if (!this.get(i).equals(list.get(i))) {
                                        return false;
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 8a31099..93d5f9f 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -38,7 +38,6 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
index 1076338..3ac17db 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.testutils.TestFileUtils;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index f66bd76..3e5d309 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -40,8 +40,6 @@ import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 
-import static org.junit.Assert.*;
-
 /**
  * Tests for the FileInputFormat
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
index 69159f2..683e164 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/OuterJoinOperatorBaseTest.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.api.common.operators.base;
 
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
@@ -29,6 +25,11 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
 
 @SuppressWarnings("serial")
@@ -67,7 +68,7 @@ public class OuterJoinOperatorBaseTest implements 
Serializable {
 
        @Test
        public void testFullOuterJoinWithEmptyLeftInput() throws Exception {
-               final List<String> leftInput = Arrays.asList();
+               final List<String> leftInput = Collections.emptyList();
                final List<String> rightInput = Arrays.asList("foo", "bar", 
"foobar");
                
baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
                List<String> expected = Arrays.asList("null,bar", "null,foo", 
"null,foobar");
@@ -77,7 +78,7 @@ public class OuterJoinOperatorBaseTest implements 
Serializable {
        @Test
        public void testFullOuterJoinWithEmptyRightInput() throws Exception {
                final List<String> leftInput = Arrays.asList("foo", "bar", 
"foobar");
-               final List<String> rightInput = Arrays.asList();
+               final List<String> rightInput = Collections.emptyList();
                
baseOperator.setOuterJoinType(OuterJoinOperatorBase.OuterJoinType.FULL);
                List<String> expected = Arrays.asList("bar,null", "foo,null", 
"foobar,null");
                testOuterJoin(leftInput, rightInput, expected);

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
index ca5819e..5bfdca2 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import java.math.BigDecimal;
-import java.math.BigInteger;
+
 import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
index fd3cbd5..ff375d1 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.Random;
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
index 543c0e9..2e5de9f 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.api.common.typeutils.base;
 
-import java.util.Random;
-
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+
+import java.util.Random;
 /**
  * A test for the {@link DoubleSerializer}.
  */
@@ -46,8 +45,8 @@ public class DoubleSerializerTest extends 
SerializerTestBase<Double> {
        @Override
        protected Double[] getTestData() {
                Random rnd = new Random(874597969123412341L);
-               Double rndDouble = rnd.nextDouble() * Double.MAX_VALUE;
-               
+               double rndDouble = rnd.nextDouble() * Double.MAX_VALUE;
+
                return new Double[] {Double.valueOf(0), Double.valueOf(1), 
Double.valueOf(-1),
                                                        
Double.valueOf(Double.MAX_VALUE), Double.valueOf(Double.MIN_VALUE),
                                                        
Double.valueOf(rndDouble), Double.valueOf(-rndDouble),

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
index cedefe7..7133b01 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/SqlDateComparatorTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.typeutils.base;
 
 import java.sql.Date;
-import java.sql.Timestamp;
+
 import org.apache.flink.api.common.typeutils.ComparatorTestBase;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
index 78c10b1..a3d4568 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java
@@ -32,7 +32,7 @@ public class EitherTypeInfoTest extends TestLogger {
 
        Either<Integer, String> intEither = Either.Left(1);
        Either<Integer, String> stringEither = Either.Right("boo");
-       Either<Integer, Tuple2<Double, Long>> tuple2Either = new Right<>(new 
Tuple2<Double, Long>(42.0, 2l));
+       Either<Integer, Tuple2<Double, Long>> tuple2Either = new Right<>(new 
Tuple2<Double, Long>(42.0, 2L));
 
        @Test
        public void testEitherTypeEquality() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
index d636d5e..acf0d2e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java
@@ -60,7 +60,7 @@ public class EitherSerializerTest {
        public void testEitherWithTuple() {
 
        Either<Tuple2<Long, Long>, Double>[] testData = new Either[] {
-                       Either.Left(new Tuple2<>(2l, 9l)),
+                       Either.Left(new Tuple2<>(2L, 9L)),
                        new Left<>(new Tuple2<>(Long.MIN_VALUE, 
Long.MAX_VALUE)),
                        new Right<>(32.0),
                        Right(Double.MIN_VALUE),

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
index cf73be2..9db5cd9 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorTTT1Test.java
@@ -17,14 +17,11 @@
  */
 package org.apache.flink.api.java.typeutils.runtime;
 
-import static org.junit.Assert.assertEquals;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.DoubleComparator;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
-import org.apache.flink.api.common.typeutils.base.IntComparator;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringComparator;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -32,6 +29,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import 
org.apache.flink.api.java.typeutils.runtime.tuple.base.TupleComparatorTestBase;
 
+import static org.junit.Assert.assertEquals;
+
 public class TupleComparatorTTT1Test extends 
TupleComparatorTestBase<Tuple3<Tuple2<String, Double>, Tuple2<Long, Long>, 
Tuple2<Integer, Long>>> {
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java 
b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
index 76bdece..29dc436 100644
--- a/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/CopyableValueTest.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.types;
 
-import static org.junit.Assert.*;
-
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class CopyableValueTest {
 
        @Test
@@ -33,7 +33,7 @@ public class CopyableValueTest {
                        new DoubleValue(3.1415926535897932),
                        new FloatValue((float) 3.14159265),
                        new IntValue(42),
-                       new LongValue(42l),
+                       new LongValue(42L),
                        new NullValue(),
                        new ShortValue((short) 42),
                        new StringValue("QED")

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/RecordTest.java 
b/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
index d7e3edd..a081e8e 100644
--- a/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/RecordTest.java
@@ -285,7 +285,7 @@ public class RecordTest {
                        record.setNull(mask);
        
                        for (int i = 0; i < 58; i++) {
-                               if (((1l << i) & mask) != 0) {
+                               if (((1L << i) & mask) != 0) {
                                        assertTrue(record.getField(i, 
IntValue.class) == null);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
index d6fd913..f01d9d8 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/functions/FunctionAnnotation.java
@@ -18,19 +18,19 @@
 
 package org.apache.flink.api.java.functions;
 
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.InvalidProgramException;
+
 import java.lang.annotation.Annotation;
 import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import java.lang.annotation.Retention;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.InvalidProgramException;
-
 /**
  * This class defines Java annotations for semantic assertions that can be 
added to Flink functions.
  * Semantic annotations can help the Flink optimizer to generate more 
efficient execution plans for Flink programs.

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index 2ed0300..b3234b8 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -106,7 +106,7 @@ public class PartitionOperator<T> extends 
SingleInputOperator<T, T, PartitionOpe
 
        /**
         * Sets the order of keys for range partitioning.
-        * NOTE: Only valid for {@link PartitionMethod.RANGE}.
+        * NOTE: Only valid for {@link PartitionMethod#RANGE}.
         *
         * @param orders array of orders for each specified partition key
         * @return The partitioneOperator with properly set orders for given 
keys

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java 
b/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
index 43450f7..cf0716d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/sca/TaggedValue.java
@@ -44,7 +44,7 @@ public class TaggedValue extends BasicValue {
                INPUT_1_ITERABLE, INPUT_2_ITERABLE, INPUT_1_ITERATOR, 
INPUT_2_ITERATOR, // input iterators
                ITERATOR_TRUE_ASSUMPTION, // boolean value that is "true" at 
least once
                NULL // null
-       };
+       }
 
        public static enum Input {
                INPUT_1(0), INPUT_2(1);
@@ -58,7 +58,7 @@ public class TaggedValue extends BasicValue {
                public int getId() {
                        return id;
                }
-       };
+       }
 
        private Tag tag;
        // only inputs can set this to true

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
index 372c0f7..b845e73 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropUtilTest.java
@@ -19,20 +19,19 @@
 
 package org.apache.flink.api.java.functions;
 
+import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import 
org.apache.flink.api.common.operators.SemanticProperties.InvalidSemanticAnnotationException;
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.junit.Assert;
-import org.apache.flink.api.common.operators.DualInputSemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.junit.Test;
 
 import static org.junit.Assert.assertTrue;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
index 474563d..59d2d61 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
@@ -18,18 +18,18 @@
 
 package org.apache.flink.api.java.operator;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Assert;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 
 public class CrossOperatorTest {
 
@@ -450,7 +450,7 @@ public class CrossOperatorTest {
                public long myLong;
                public String myString;
 
-               public CustomType() {};
+               public CustomType() {}
 
                public CustomType(int i, long l, String s) {
                        myInt = i;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
index 2e9bdf7..0bbeeb2 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
@@ -214,8 +214,8 @@ public class DistinctOperatorTest {
                public long myLong;
                public String myString;
                
-               public CustomType() {};
-               
+               public CustomType() {}
+
                public CustomType(int i, long l, String s) {
                        myInt = i;
                        myLong = l;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index 9220095..18b17b5 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -606,8 +606,8 @@ public class GroupingTest {
                public String myString;
                public Nest nested;
                
-               public CustomType() {};
-               
+               public CustomType() {}
+
                public CustomType(int i, long l, String s) {
                        myInt = i;
                        myLong = l;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index ae23382..0246f60 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.api.java.operator;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.operators.SemanticProperties;
@@ -32,7 +28,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.JoinOperator;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -44,6 +39,10 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
 import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
@@ -1167,7 +1166,7 @@ public class JoinOperatorTest {
 
                public int myInt;
 
-               public Nested() {};
+               public Nested() {}
 
                public Nested(int i, long l, String s) {
                        myInt = i;
@@ -1188,7 +1187,7 @@ public class JoinOperatorTest {
                public String myString;
                public Nested nest;
                
-               public NestedCustomType() {};
+               public NestedCustomType() {}
 
                public NestedCustomType(int i, long l, String s) {
                        myInt = i;
@@ -1214,8 +1213,8 @@ public class JoinOperatorTest {
                public List<String> countries;
                public Writable interfaceTest;
                
-               public CustomType() {};
-               
+               public CustomType() {}
+
                public CustomType(int i, long l, String s) {
                        myInt = i;
                        myLong = l;
@@ -1242,8 +1241,8 @@ public class JoinOperatorTest {
                public String myString;
                public Tuple2<Integer, String> intByString;
                
-               public CustomTypeWithTuple() {};
-               
+               public CustomTypeWithTuple() {}
+
                public CustomTypeWithTuple(int i, long l, String s) {
                        myInt = i;
                        myLong = l;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
index 2af8a8c..cbb7690 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
@@ -211,7 +211,7 @@ public class MaxByOperatorTest {
                public String myString;
 
                public CustomType() {
-               };
+               }
 
                public CustomType(int i, long l, String s) {
                        myInt = i;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
index 5d9c938..b9659c0 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
@@ -212,7 +212,7 @@ public class MinByOperatorTest {
                public String myString;
 
                public CustomType() {
-               };
+               }
 
                public CustomType(int i, long l, String s) {
                        myInt = i;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
index dafc1f2..b7f7555 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/ReduceOperatorTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.java.operator;
 
-import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -26,12 +25,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.ReduceOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.util.Collector;
 import org.junit.Test;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
index 3540e6a..c3307ec 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operator/SortPartitionTest.java
@@ -263,8 +263,8 @@ public class SortPartitionTest {
                public String myString;
                public Nest nested;
                
-               public CustomType() {};
-               
+               public CustomType() {}
+
                public CustomType(int i, long l, String s) {
                        myInt = i;
                        myLong = l;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index cbdac4a..27c7b2f 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -261,7 +261,7 @@ public class DistinctTranslationTest {
                private static final long serialVersionUID = 1L;
                public int myInt;
 
-               public CustomType() {};
+               public CustomType() {}
 
                public CustomType(int i) {
                        myInt = i;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
index 01dc070..72a27b1 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerExamplesTest.java
@@ -204,7 +204,7 @@ public class UdfAnalyzerExamplesTest {
                                edge = edges.next();
                                Integer otherVertex = edge.getSecondVertex();
                                // collect unique vertices
-                               if(!otherVertices.contains(otherVertex) && 
otherVertex != groupVertex) {
+                               if(!otherVertices.contains(otherVertex) && 
!otherVertex.equals(groupVertex)) {
                                        this.otherVertices.add(otherVertex);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
index ac35793..c371082 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/sca/UdfAnalyzerTest.java
@@ -751,7 +751,7 @@ public class UdfAnalyzerTest {
                }
 
                private MyPojo recursiveFunction(MyPojo value) {
-                       if (value.field == "xyz") {
+                       if (value.field.equals("xyz")) {
                                value.field = value.field + "x";
                                return recursiveFunction(value);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
index 556e2e3..bf3c4ed 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BulkIterationNode.java
@@ -18,19 +18,12 @@
 
 package org.apache.flink.optimizer.dag;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import 
org.apache.flink.api.common.operators.SemanticProperties.EmptySemanticProperties;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
 import org.apache.flink.optimizer.costs.CostEstimator;
 import org.apache.flink.optimizer.dag.WorksetIterationNode.SingleRootJoiner;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
@@ -45,12 +38,19 @@ import 
org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
 import org.apache.flink.optimizer.plan.Channel;
 import org.apache.flink.optimizer.plan.NamedChannel;
 import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import 
org.apache.flink.optimizer.plan.PlanNode.FeedbackPropertiesMeetRequirementsReport;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.traversals.InterestingPropertyVisitor;
 import org.apache.flink.optimizer.util.NoOpUnaryUdfOp;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.util.Visitor;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
 /**
  * A node in the optimizer's program representation for a bulk iteration.
  */
@@ -314,7 +314,7 @@ public class BulkIterationNode extends SingleInputNode 
implements IterationNode
                                
                                FeedbackPropertiesMeetRequirementsReport report 
= candidate.checkPartialSolutionPropertiesMet(pspn, atEndGlobal, atEndLocal);
                                if (report == 
FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
-                                       ; // depends only through broadcast 
variable on the partial solution
+                                       // depends only through broadcast 
variable on the partial solution
                                }
                                else if (report == 
FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
                                        // attach a no-op node through which we 
create the properties of the original input

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
index 7969a94..d7ccaca 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/WorksetIterationNode.java
@@ -361,7 +361,7 @@ public class WorksetIterationNode extends TwoInputNode 
implements IterationNode
                                                                                
                                                                                
                        atEndGlobal, atEndLocal);
 
                                if (report == 
FeedbackPropertiesMeetRequirementsReport.NO_PARTIAL_SOLUTION) {
-                                       ; // depends only through broadcast 
variable on the workset solution
+                                       // depends only through broadcast 
variable on the workset solution
                                }
                                else if (report == 
FeedbackPropertiesMeetRequirementsReport.NOT_MET) {
                                        // attach a no-op node through which we 
create the properties of the original input

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
index e64782f..654b054 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.optimizer.dataproperties;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.Partitioner;
@@ -37,6 +34,9 @@ import 
org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Set;
+
 /**
  * This class represents global properties of the data at a certain point in 
the plan.
  * Global properties are properties that describe data across different 
partitions, such as
@@ -224,7 +224,7 @@ public class GlobalProperties implements Cloneable {
                        }
                        
                        for (int i = 0; i < this.ordering.getNumberOfFields(); 
i++) {
-                               if (this.ordering.getFieldNumber(i) != 
o.getFieldNumber(i)) {
+                               if 
(!this.ordering.getFieldNumber(i).equals(o.getFieldNumber(i))) {
                                        return false;
                                }
                                

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index 9505a57..b30fa36 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.optimizer.plan;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.optimizer.CompilerException;
@@ -39,6 +33,12 @@ import 
org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.util.Visitable;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * The representation of a data exchange between to operators. The data 
exchange can realize a shipping strategy, 
  * which established global properties, and a local strategy, which 
establishes local properties.
@@ -547,7 +547,7 @@ public abstract class PlanNode implements 
Visitable<PlanNode>, DumpableNode<Plan
        // 
--------------------------------------------------------------------------------------------
        
        public static enum SourceAndDamReport {
-               NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM;
+               NOT_FOUND, FOUND_SOURCE, FOUND_SOURCE_AND_DAM
        }
        
        
@@ -568,6 +568,6 @@ public abstract class PlanNode implements 
Visitable<PlanNode>, DumpableNode<Plan
                MET,
                
                /** Indicates that the question whether the properties are met 
has been determined false */
-               NOT_MET;
+               NOT_MET
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
index 536e425..16bb47b 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plandump/PlanJSONDumpGenerator.java
@@ -18,19 +18,6 @@
 
 package org.apache.flink.optimizer.plandump;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
 import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.flink.api.common.operators.CompilerHints;
 import org.apache.flink.optimizer.CompilerException;
@@ -55,6 +42,19 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.StringUtils;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
 
 public class PlanJSONDumpGenerator {
        
@@ -160,7 +160,7 @@ public class PlanJSONDumpGenerator {
                        //to set first to false!
                        if (visit(child, writer, first)) {
                                first = false;
-                       };
+                       }
                }
                
                // check if this node should be skipped from the dump

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
index 656a323..b9b6d8a 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
@@ -77,7 +77,7 @@ public class AdditionalOperatorsTest extends CompilerTestBase 
{
                DataSet<Long> set2 = env.generateSequence(0,1);
 
                set1.crossWithHuge(set2).name("Cross")
-                               .output(new DiscardingOutputFormat<Tuple2<Long, 
Long>>());;
+                               .output(new DiscardingOutputFormat<Tuple2<Long, 
Long>>());
 
                try {
                        Plan plan = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
index 1f8904a..2b3136a 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
@@ -505,7 +505,7 @@ public class BranchingPlansCompilerTest extends 
CompilerTestBase {
 
                loopRes.output(new DiscardingOutputFormat<Long>());
                loopRes.map(new IdentityMapper<Long>())
-                               .output(new DiscardingOutputFormat<Long>());;
+                               .output(new DiscardingOutputFormat<Long>());
 
                Plan plan = env.createProgramPlan();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
index e4cf1c8..ac4c090 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/PartitioningReusageTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
index df13da8..fe79704 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/SemanticPropertiesAPIToPlanTest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
index f4776a0..f3e124b 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dag/GroupCombineNodeTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.optimizer.dag;
 import org.apache.flink.api.common.operators.SemanticProperties;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.configuration.Configuration;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/9206b483/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
index 52826d6..ddfa074 100644
--- 
a/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
+++ 
b/flink-optimizer/src/test/java/org/apache/flink/optimizer/dataproperties/GlobalPropertiesMatchingTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.optimizer.dataproperties;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
@@ -28,6 +26,10 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.junit.Test;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class GlobalPropertiesMatchingTest {
 
        @Test
@@ -180,12 +182,12 @@ public class GlobalPropertiesMatchingTest {
                        assertFalse(req.isMetBy(gp3));
 
                        GlobalProperties gp4 = new GlobalProperties();
-                       gp3.setAnyPartitioning(new FieldList(6, 1));
-                       assertFalse(req.isMetBy(gp3));
+                       gp4.setAnyPartitioning(new FieldList(6, 1));
+                       assertFalse(req.isMetBy(gp4));
 
                        GlobalProperties gp5 = new GlobalProperties();
-                       gp4.setAnyPartitioning(new FieldList(2));
-                       assertFalse(req.isMetBy(gp4));
+                       gp5.setAnyPartitioning(new FieldList(2));
+                       assertFalse(req.isMetBy(gp5));
                }
 
                // match hash partitioning

Reply via email to