[FLINK-6722] [table] Activate strict checkstyle

This closes #4021.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ceaf5b61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ceaf5b61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ceaf5b61

Branch: refs/heads/master
Commit: ceaf5b611090cec51dd1d2af1681eb496912b993
Parents: 2f8cacd
Author: Greg Hogan <[email protected]>
Authored: Thu May 25 17:20:56 2017 -0400
Committer: zentol <[email protected]>
Committed: Thu Jun 1 11:15:06 2017 +0200

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |  34 +++
 .../flink/table/annotation/TableType.java       |   1 +
 .../flink/table/api/java/package-info.java      |   1 +
 .../org/apache/flink/table/explain/Node.java    |  60 ++++--
 .../flink/table/explain/PlanJsonParser.java     |  48 +++--
 .../resources/tableSourceConverter.properties   |   2 +-
 .../api/java/batch/TableEnvironmentITCase.java  |  49 +++--
 .../table/api/java/batch/TableSourceITCase.java |   4 +
 .../api/java/batch/sql/GroupingSetsITCase.java  |  10 +-
 .../table/api/java/batch/sql/SqlITCase.java     |  16 +-
 .../table/api/java/stream/sql/SqlITCase.java    |  32 +--
 .../api/java/stream/utils/StreamTestData.java   |   3 +
 .../api/java/utils/UserDefinedAggFunctions.java | 205 ++++++++++---------
 .../java/utils/UserDefinedScalarFunctions.java  |  22 +-
 .../java/utils/UserDefinedTableFunctions.java   |   7 +
 .../table/api/scala/batch/ExplainTest.scala     |   8 +-
 16 files changed, 325 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
index 15a9d07..855a520 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -251,6 +251,40 @@ under the License.
                                </configuration>
                        </plugin>
 
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-checkstyle-plugin</artifactId>
+                               <version>2.17</version>
+                               <dependencies>
+                                       <dependency>
+                                               
<groupId>com.puppycrawl.tools</groupId>
+                                               
<artifactId>checkstyle</artifactId>
+                                               <version>6.19</version>
+                                       </dependency>
+                               </dependencies>
+                               <configuration>
+                                       
<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+                                       
<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                       
<logViolationsToConsole>true</logViolationsToConsole>
+                                       <failOnViolation>true</failOnViolation>
+                               </configuration>
+                               <executions>
+                                       <!--
+                                       Execute checkstyle after compilation 
but before tests.
+
+                                       This ensures that any parsing or type 
checking errors are from
+                                       javac, so they look as expected. Beyond 
that, we want to
+                                       fail as early as possible.
+                                       -->
+                                       <execution>
+                                               <phase>test-compile</phase>
+                                               <goals>
+                                                       <goal>check</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
index 3845eae..2d2a7af 100644
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.annotation;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.table.catalog.TableSourceConverter;
+
 import java.lang.annotation.Documented;
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
index 3dbf50f..50d41a2 100644
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java
@@ -61,6 +61,7 @@
  * {@link 
org.apache.flink.table.api.java.StreamTableEnvironment#toAppendStream(Table, 
java.lang.Class)}}, or
  * {@link 
org.apache.flink.table.api.java.StreamTableEnvironment#toRetractStream(Table, 
java.lang.Class)}}.
  */
+
 package org.apache.flink.table.api.java;
 
 import org.apache.flink.table.api.Table;

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java
index 4616728..6317d0c 100644
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java
@@ -20,77 +20,93 @@ package org.apache.flink.table.explain;
 
 import java.util.List;
 
+/**
+ * Field hierarchy of an execution plan.
+ */
 public class Node {
        private int id;
        private String type;
        private String pact;
        private String contents;
        private int parallelism;
-       private String driver_strategy;
+       private String driverStrategy;
        private List<Predecessors> predecessors;
-       private List<Global_properties> global_properties;
-       private List<LocalProperty> local_properties;
+       private List<GlobalProperties> globalProperties;
+       private List<LocalProperty> localProperties;
        private List<Estimates> estimates;
        private List<Costs> costs;
-       private List<Compiler_hints> compiler_hints;
+       private List<CompilerHints> compilerHints;
 
        public int getId() {
                return id;
        }
+
        public String getType() {
                return type;
        }
+
        public String getPact() {
                return pact;
        }
+
        public String getContents() {
                return contents;
        }
+
        public int getParallelism() {
                return parallelism;
        }
-       public String getDriver_strategy() {
-               return driver_strategy;
+
+       public String getDriverStrategy() {
+               return driverStrategy;
        }
+
        public List<Predecessors> getPredecessors() {
                return predecessors;
        }
-       public List<Global_properties> getGlobal_properties() {
-               return global_properties;
+
+       public List<GlobalProperties> getGlobalProperties() {
+               return globalProperties;
        }
-       public List<LocalProperty> getLocal_properties() {
-               return local_properties;
+
+       public List<LocalProperty> getLocalProperties() {
+               return localProperties;
        }
+
        public List<Estimates> getEstimates() {
                return estimates;
        }
+
        public List<Costs> getCosts() {
                return costs;
        }
-       public List<Compiler_hints> getCompiler_hints() {
-               return compiler_hints;
+
+       public List<CompilerHints> getCompilerHints() {
+               return compilerHints;
        }
 }
 
 class Predecessors {
-       private String ship_strategy;
-       private String exchange_mode;
+       private String shipStrategy;
+       private String exchangeMode;
 
-       public String getShip_strategy() {
-               return ship_strategy;
+       public String getShipStrategy() {
+               return shipStrategy;
        }
-       public String getExchange_mode() {
-               return exchange_mode;
+
+       public String getExchangeMode() {
+               return exchangeMode;
        }
 }
 
-class Global_properties {
+class GlobalProperties {
        private String name;
        private String value;
 
        public String getValue() {
                return value;
        }
+
        public String getName() {
                return name;
        }
@@ -103,6 +119,7 @@ class LocalProperty {
        public String getValue() {
                return value;
        }
+
        public String getName() {
                return name;
        }
@@ -115,6 +132,7 @@ class Estimates {
        public String getValue() {
                return value;
        }
+
        public String getName() {
                return name;
        }
@@ -127,18 +145,20 @@ class Costs {
        public String getValue() {
                return value;
        }
+
        public String getName() {
                return name;
        }
 }
 
-class Compiler_hints {
+class CompilerHints {
        private String name;
        private String value;
 
        public String getValue() {
                return value;
        }
+
        public String getName() {
                return name;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
index f13c042..ee9b9da 100644
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
+++ 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java
@@ -18,20 +18,25 @@
 
 package org.apache.flink.table.explain;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.LinkedHashMap;
 import java.util.List;
 
+/**
+ * Utility for converting an execution plan from JSON to a human-readable 
string.
+ */
 public class PlanJsonParser {
 
        public static String getSqlExecutionPlan(String t, Boolean extended) 
throws Exception {
                ObjectMapper objectMapper = new ObjectMapper();
+               
objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
 
-               //not every node is same, ignore the unknown field
+               // not every node is same, ignore the unknown field
                
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
 
                PlanTree tree = objectMapper.readValue(t, PlanTree.class);
@@ -43,7 +48,7 @@ public class PlanJsonParser {
                for (int index = 0; index < tree.getNodes().size(); index++) {
                        Node tempNode = tree.getNodes().get(index);
 
-                       //input with operation such as join or union is 
coordinate, keep the same indent 
+                       // input with operation such as join or union is 
coordinate, keep the same indent
                        if ((tempNode.getPact().equals("Data Source")) && 
(map.containsKey(tempNode.getPact()))) {
                                tabCount = map.get(tempNode.getPact());
                        }
@@ -57,15 +62,15 @@ public class PlanJsonParser {
                        printTab(tabCount + 1, pw);
                        String content = tempNode.getContents();
 
-                       //drop the hashcode of object instance
+                       // drop the hashcode of object instance
                        int dele = tempNode.getContents().indexOf("@");
                        if (dele > -1) {
                                content = tempNode.getContents().substring(0, 
dele);
                        }
 
-                       //replace with certain content if node is dataSource to 
pass
-                       //unit tests, because java and scala use different api 
to
-                       //get input element
+                       // replace with certain content if node is dataSource 
to pass
+                       // unit tests, because java and scala use different api 
to
+                       // get input element
                        if (tempNode.getPact().equals("Data Source")) {
                                content = "collect elements with 
CollectionInputFormat";
                        }
@@ -74,35 +79,35 @@ public class PlanJsonParser {
                        List<Predecessors> predecessors = 
tempNode.getPredecessors();
                        if (predecessors != null) {
                                printTab(tabCount + 1, pw);
-                               pw.print("ship_strategy : " + 
predecessors.get(0).getShip_strategy() + "\n");
+                               pw.print("ship_strategy : " + 
predecessors.get(0).getShipStrategy() + "\n");
 
-                               String mode = 
predecessors.get(0).getExchange_mode();
+                               String mode = 
predecessors.get(0).getExchangeMode();
                                if (mode != null) {
                                        printTab(tabCount + 1, pw);
                                        pw.print("exchange_mode : " + mode + 
"\n");
                                }
                        }
 
-                       if (tempNode.getDriver_strategy() != null) {
+                       if (tempNode.getDriverStrategy() != null) {
                                printTab(tabCount + 1, pw);
-                               pw.print("driver_strategy : " + 
tempNode.getDriver_strategy() + "\n");
+                               pw.print("driver_strategy : " + 
tempNode.getDriverStrategy() + "\n");
                        }
 
-                       if (tempNode.getGlobal_properties() != null) {
+                       if (tempNode.getGlobalProperties() != null) {
                                printTab(tabCount + 1, pw);
-                               
pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
-                                       + 
tempNode.getGlobal_properties().get(0).getValue() + "\n");
+                               
pw.print(tempNode.getGlobalProperties().get(0).getName() + " : "
+                                       + 
tempNode.getGlobalProperties().get(0).getValue() + "\n");
                        }
 
                        if (extended) {
-                               List<Global_properties> globalProperties = 
tempNode.getGlobal_properties();
+                               List<GlobalProperties> globalProperties = 
tempNode.getGlobalProperties();
                                for (int i = 1; i < globalProperties.size(); 
i++) {
                                        printTab(tabCount + 1, pw);
                                        
pw.print(globalProperties.get(i).getName() + " : "
                                        + globalProperties.get(i).getValue() + 
"\n");
                                }
 
-                               List<LocalProperty> localProperties = 
tempNode.getLocal_properties();
+                               List<LocalProperty> localProperties = 
tempNode.getLocalProperties();
                                for (int i = 0; i < localProperties.size(); 
i++) {
                                        printTab(tabCount + 1, pw);
                                        
pw.print(localProperties.get(i).getName() + " : "
@@ -123,11 +128,11 @@ public class PlanJsonParser {
                                        + costs.get(i).getValue() + "\n");
                                }
 
-                               List<Compiler_hints> compilerHintses = 
tempNode.getCompiler_hints();
-                               for (int i = 0; i < compilerHintses.size(); 
i++) {
+                               List<CompilerHints> compilerHints = 
tempNode.getCompilerHints();
+                               for (int i = 0; i < compilerHints.size(); i++) {
                                        printTab(tabCount + 1, pw);
-                                       
pw.print(compilerHintses.get(i).getName() + " : "
-                                       + compilerHintses.get(i).getValue() + 
"\n");
+                                       pw.print(compilerHints.get(i).getName() 
+ " : "
+                                       + compilerHints.get(i).getValue() + 
"\n");
                                }
                        }
                        tabCount++;
@@ -138,8 +143,9 @@ public class PlanJsonParser {
        }
 
        private static void printTab(int tabCount, PrintWriter pw) {
-               for (int i = 0; i < tabCount; i++)
+               for (int i = 0; i < tabCount; i++) {
                        pw.print("\t");
+               }
        }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
 
b/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
index 1632e12..d548f48 100644
--- 
a/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
+++ 
b/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
@@ -26,4 +26,4 @@
 # which offers converters instead of put all information into the
 # tableSourceConverter.properties of flink-table module.
 
################################################################################
-scan.packages=org.apache.flink.table.sources
\ No newline at end of file
+scan.packages=org.apache.flink.table.sources

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index 3bb283f..aac7e11 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -18,38 +18,43 @@
 
 package org.apache.flink.table.api.java.batch;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.calcite.tools.RuleSets;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.types.Row;
 import org.apache.flink.table.calcite.CalciteConfig;
 import org.apache.flink.table.calcite.CalciteConfigBuilder;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.types.Row;
+
+import org.apache.calcite.tools.RuleSets;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Integration tests for {@link BatchTableEnvironment}.
+ */
 @RunWith(Parameterized.class)
 public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
 
@@ -401,7 +406,7 @@ public class TableEnvironmentITCase extends 
TableProgramsCollectionTestBase {
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
 
                // use null value the enforce GenericType
-               DataSet<Row> dataSet = env.fromElements(Row.of((Integer)null));
+               DataSet<Row> dataSet = env.fromElements(Row.of((Integer) null));
                assertTrue(dataSet.getType() instanceof GenericTypeInfo);
                assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
 
@@ -482,10 +487,16 @@ public class TableEnvironmentITCase extends 
TableProgramsCollectionTestBase {
 
        // 
--------------------------------------------------------------------------------------------
 
+       /**
+        * Non-static class.
+        */
        public class MyNonStatic {
                public int number;
        }
 
+       /**
+        * Small POJO.
+        */
        @SuppressWarnings("unused")
        public static class SmallPojo {
 
@@ -506,6 +517,9 @@ public class TableEnvironmentITCase extends 
TableProgramsCollectionTestBase {
                public Integer[] roles;
        }
 
+       /**
+        * POJO with generic fields.
+        */
        @SuppressWarnings("unused")
        public static class PojoWithGeneric {
                public String name;
@@ -531,6 +545,9 @@ public class TableEnvironmentITCase extends 
TableProgramsCollectionTestBase {
                }
        }
 
+       /**
+        * Small POJO with private fields.
+        */
        @SuppressWarnings("unused")
        public static class PrivateSmallPojo {
 
@@ -581,6 +598,9 @@ public class TableEnvironmentITCase extends 
TableProgramsCollectionTestBase {
                }
        }
 
+       /**
+        * Another small POJO.
+        */
        @SuppressWarnings("unused")
        public static class SmallPojo2 {
 
@@ -606,6 +626,9 @@ public class TableEnvironmentITCase extends 
TableProgramsCollectionTestBase {
                }
        }
 
+       /**
+        * Another small POJO with private fields.
+        */
        @SuppressWarnings("unused")
        public static class PrivateSmallPojo2 {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
index a7ccb7e..864d4f8 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java
@@ -27,12 +27,16 @@ import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestB
 import org.apache.flink.table.sources.BatchTableSource;
 import org.apache.flink.table.utils.CommonTestData;
 import org.apache.flink.types.Row;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.List;
 
+/**
+ * Integration tests for {@link BatchTableSource}.
+ */
 @RunWith(Parameterized.class)
 public class TableSourceITCase extends TableProgramsCollectionTestBase {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
index 3f611d5..6c1a753 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.api.java.batch.sql;
 
-import java.util.Comparator;
-import java.util.List;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -33,11 +31,15 @@ import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.Row;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.Comparator;
+import java.util.List;
+
 /**
  * This test should be replaced by a DataSetAggregateITCase.
  * We should only perform logical unit tests here.
@@ -46,8 +48,8 @@ import org.junit.runners.Parameterized;
 @RunWith(Parameterized.class)
 public class GroupingSetsITCase extends TableProgramsClusterTestBase {
 
-       private final static String TABLE_NAME = "MyTable";
-       private final static String TABLE_WITH_NULLS_NAME = "MyTableWithNulls";
+       private static final String TABLE_NAME = "MyTable";
+       private static final String TABLE_WITH_NULLS_NAME = "MyTableWithNulls";
        private BatchTableEnvironment tableEnv;
 
        public GroupingSetsITCase(TestExecutionMode mode, TableConfigMode 
tableConfigMode) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
index 114226c..f4e5daf 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java
@@ -22,17 +22,18 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.types.Row;
-import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.apache.flink.types.Row;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -42,6 +43,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Integration tests for batch SQL.
+ */
 @RunWith(Parameterized.class)
 public class SqlITCase extends TableProgramsCollectionTestBase {
 
@@ -136,7 +140,7 @@ public class SqlITCase extends 
TableProgramsCollectionTestBase {
                DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
 
                tableEnv.registerDataSet("t1", ds1, "a, b, c");
-               tableEnv.registerDataSet("t2",ds2, "d, e, f, g, h");
+               tableEnv.registerDataSet("t2", ds2, "d, e, f, g, h");
 
                String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
                Table result = tableEnv.sql(sqlQuery);

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
index d827cd6..9270221 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -18,27 +18,31 @@
 
 package org.apache.flink.table.api.java.stream.sql;
 
-import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.types.Row;
+
 import org.junit.Test;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * Integration tests for streaming SQL.
+ */
 public class SqlITCase extends StreamingMultipleProgramsTestBase {
-       
+
        @Test
        public void testRowRegisterRowWithNames() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -49,17 +53,17 @@ public class SqlITCase extends 
StreamingMultipleProgramsTestBase {
                data.add(Row.of(1, 1L, "Hi"));
                data.add(Row.of(2, 2L, "Hello"));
                data.add(Row.of(3, 2L, "Hello world"));
-               
+
                TypeInformation<?>[] types = {
                                BasicTypeInfo.INT_TYPE_INFO,
                                BasicTypeInfo.LONG_TYPE_INFO,
                                BasicTypeInfo.STRING_TYPE_INFO};
-               String names[] = {"a","b","c"};
-               
+               String[] names = {"a", "b", "c"};
+
                RowTypeInfo typeInfo = new RowTypeInfo(types, names);
-               
+
                DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
-               
+
                Table in = tableEnv.fromDataStream(ds, "a,b,c");
                tableEnv.registerTable("MyTableRow", in);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
index 139801f..a23bc5a 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
@@ -27,6 +27,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+/**
+ * Test data.
+ */
 public class StreamTestData {
 
        public static DataStream<Tuple3<Integer, Long, String>> 
getSmall3TupleDataSet(StreamExecutionEnvironment env) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java
index a51a4af..94c5c90 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.table.api.java.utils;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -22,98 +23,116 @@ import org.apache.flink.table.functions.AggregateFunction;
 
 import java.util.Iterator;
 
+/**
+ * Test aggregator functions.
+ */
 public class UserDefinedAggFunctions {
-    // Accumulator for test requiresOver
-    public static class Accumulator0 extends Tuple2<Long, Integer>{}
-
-    // Test for requiresOver
-    public static class OverAgg0 extends AggregateFunction<Long, Accumulator0> 
{
-        @Override
-        public Accumulator0 createAccumulator() {
-            return new Accumulator0();
-        }
-
-        @Override
-        public Long getValue(Accumulator0 accumulator) {
-            return 1L;
-        }
-
-        //Overloaded accumulate method
-        public void accumulate(Accumulator0 accumulator, long iValue, int 
iWeight) {
-        }
-
-        @Override
-        public boolean requiresOver() {
-            return true;
-        }
-    }
-
-    // Accumulator for WeightedAvg
-    public static class WeightedAvgAccum extends Tuple2<Long, Integer> {
-        public long sum = 0;
-        public int count = 0;
-    }
-
-    // Base class for WeightedAvg
-    public static class WeightedAvg extends AggregateFunction<Long, 
WeightedAvgAccum> {
-        @Override
-        public WeightedAvgAccum createAccumulator() {
-            return new WeightedAvgAccum();
-        }
-
-        @Override
-        public Long getValue(WeightedAvgAccum accumulator) {
-            if (accumulator.count == 0)
-                return null;
-            else
-                return accumulator.sum/accumulator.count;
-        }
-
-        //Overloaded accumulate method
-        public void accumulate(WeightedAvgAccum accumulator, long iValue, int 
iWeight) {
-            accumulator.sum += iValue * iWeight;
-            accumulator.count += iWeight;
-        }
-
-        //Overloaded accumulate method
-        public void accumulate(WeightedAvgAccum accumulator, int iValue, int 
iWeight) {
-            accumulator.sum += iValue * iWeight;
-            accumulator.count += iWeight;
-        }
-    }
-
-    // A WeightedAvg class with merge method
-    public static class WeightedAvgWithMerge extends WeightedAvg {
-        public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) 
{
-            Iterator<WeightedAvgAccum> iter = it.iterator();
-            while (iter.hasNext()) {
-                WeightedAvgAccum a = iter.next();
-                acc.count += a.count;
-                acc.sum += a.sum;
-            }
-        }
-    }
-
-    // A WeightedAvg class with merge and reset method
-    public static class WeightedAvgWithMergeAndReset extends 
WeightedAvgWithMerge {
-        public void resetAccumulator(WeightedAvgAccum acc) {
-            acc.count = 0;
-            acc.sum = 0L;
-        }
-    }
-
-    // A WeightedAvg class with retract method
-    public static class WeightedAvgWithRetract extends WeightedAvg {
-        //Overloaded retract method
-        public void retract(WeightedAvgAccum accumulator, long iValue, int 
iWeight) {
-            accumulator.sum -= iValue * iWeight;
-            accumulator.count -= iWeight;
-        }
-
-        //Overloaded retract method
-        public void retract(WeightedAvgAccum accumulator, int iValue, int 
iWeight) {
-            accumulator.sum -= iValue * iWeight;
-            accumulator.count -= iWeight;
-        }
-    }
+       /**
+        * Accumulator for test requiresOver.
+        */
+       public static class Accumulator0 extends Tuple2<Long, Integer>{}
+
+       /**
+        * Test for requiresOver.
+        */
+       public static class OverAgg0 extends AggregateFunction<Long, 
Accumulator0> {
+               @Override
+               public Accumulator0 createAccumulator() {
+                       return new Accumulator0();
+               }
+
+               @Override
+               public Long getValue(Accumulator0 accumulator) {
+                       return 1L;
+               }
+
+               //Overloaded accumulate method
+               public void accumulate(Accumulator0 accumulator, long iValue, 
int iWeight) {
+               }
+
+               @Override
+               public boolean requiresOver() {
+                       return true;
+               }
+       }
+
+       /**
+        * Accumulator for WeightedAvg.
+        */
+       public static class WeightedAvgAccum extends Tuple2<Long, Integer> {
+               public long sum = 0;
+               public int count = 0;
+       }
+
+       /**
+        * Base class for WeightedAvg.
+        */
+       public static class WeightedAvg extends AggregateFunction<Long, 
WeightedAvgAccum> {
+               @Override
+               public WeightedAvgAccum createAccumulator() {
+                       return new WeightedAvgAccum();
+               }
+
+               @Override
+               public Long getValue(WeightedAvgAccum accumulator) {
+                       if (accumulator.count == 0) {
+                               return null;
+                       } else {
+                               return accumulator.sum / accumulator.count;
+                       }
+               }
+
+               // overloaded accumulate method
+               public void accumulate(WeightedAvgAccum accumulator, long 
iValue, int iWeight) {
+                       accumulator.sum += iValue * iWeight;
+                       accumulator.count += iWeight;
+               }
+
+               //Overloaded accumulate method
+               public void accumulate(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
+                       accumulator.sum += iValue * iWeight;
+                       accumulator.count += iWeight;
+               }
+       }
+
+       /**
+        * A WeightedAvg class with merge method.
+        */
+       public static class WeightedAvgWithMerge extends WeightedAvg {
+               public void merge(WeightedAvgAccum acc, 
Iterable<WeightedAvgAccum> it) {
+                       Iterator<WeightedAvgAccum> iter = it.iterator();
+                       while (iter.hasNext()) {
+                               WeightedAvgAccum a = iter.next();
+                               acc.count += a.count;
+                               acc.sum += a.sum;
+                       }
+               }
+       }
+
+       /**
+        * A WeightedAvg class with merge and reset method.
+        */
+       public static class WeightedAvgWithMergeAndReset extends 
WeightedAvgWithMerge {
+               public void resetAccumulator(WeightedAvgAccum acc) {
+                       acc.count = 0;
+                       acc.sum = 0L;
+               }
+       }
+
+       /**
+        * A WeightedAvg class with retract method.
+        */
+       public static class WeightedAvgWithRetract extends WeightedAvg {
+               //Overloaded retract method
+               public void retract(WeightedAvgAccum accumulator, long iValue, 
int iWeight) {
+                       accumulator.sum -= iValue * iWeight;
+                       accumulator.count -= iWeight;
+               }
+
+               //Overloaded retract method
+               public void retract(WeightedAvgAccum accumulator, int iValue, 
int iWeight) {
+                       accumulator.sum -= iValue * iWeight;
+                       accumulator.count -= iWeight;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
index 1e5fabe..214dbea 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
@@ -15,25 +15,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.table.api.java.utils;
 
-import java.util.Arrays;
 import org.apache.flink.table.functions.ScalarFunction;
 
+import java.util.Arrays;
+
+/**
+ * Test scalar functions.
+ */
 public class UserDefinedScalarFunctions {
 
+       /**
+        * Increment input.
+        */
        public static class JavaFunc0 extends ScalarFunction {
                public long eval(Long l) {
                        return l + 1;
                }
        }
 
+       /**
+        * Concatenate inputs as strings.
+        */
        public static class JavaFunc1 extends ScalarFunction {
                public String eval(Integer a, int b,  Long c) {
                        return a + " and " + b + " and " + c;
                }
        }
 
+       /**
+        * Append product to string.
+        */
        public static class JavaFunc2 extends ScalarFunction {
                public String eval(String s, Integer... a) {
                        int m = 1;
@@ -44,6 +58,9 @@ public class UserDefinedScalarFunctions {
                }
        }
 
+       /**
+        * Test overloading.
+        */
        public static class JavaFunc3 extends ScalarFunction {
                public int eval(String a, int... b) {
                        return b.length;
@@ -54,6 +71,9 @@ public class UserDefinedScalarFunctions {
                }
        }
 
+       /**
+        * Concatenate arrays as strings.
+        */
        public static class JavaFunc4 extends ScalarFunction {
                public String eval(Integer[] a, String[] b) {
                        return Arrays.toString(a) + " and " + 
Arrays.toString(b);

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java
index 3af8646..63c07ed 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java
@@ -15,12 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.table.api.java.utils;
 
 import org.apache.flink.table.functions.TableFunction;
 
+/**
+ * Test functions.
+ */
 public class UserDefinedTableFunctions {
 
+       /**
+        * Emit inputs as long.
+        */
        public static class JavaTableFunc0 extends TableFunction<Long> {
                public void eval(Integer a, Long b, Long c) {
                        collect(a.longValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/ceaf5b61/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
index 1a6b314..10af4d7 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
@@ -42,7 +42,7 @@ class ExplainTest
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       
"../../src/test/scala/resources/testFilter0.out").mkString.replaceAll("\\r\\n", 
"\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
   @Test
@@ -57,7 +57,7 @@ class ExplainTest
     val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       
"../../src/test/scala/resources/testFilter1.out").mkString.replaceAll("\\r\\n", 
"\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
   @Test
@@ -102,7 +102,7 @@ class ExplainTest
     val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       
"../../src/test/scala/resources/testUnion0.out").mkString.replaceAll("\\r\\n", 
"\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
   @Test
@@ -117,7 +117,7 @@ class ExplainTest
     val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
     val source = scala.io.Source.fromFile(testFilePath +
       
"../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", 
"\n")
-    assertEquals(result, source)
+    assertEquals(source, result)
   }
 
 }

Reply via email to