[hotfix] Minor cleanup of warnings, comments, and code style in the Java API 
Utils


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93654413
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93654413
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93654413

Branch: refs/heads/master
Commit: 9365441396efdcf852e9076bdb6ca0fcc841434c
Parents: e9a5358
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jan 14 22:08:41 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 15 11:44:21 2016 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/Utils.java   | 68 ++++++++++++--------
 1 file changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93654413/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java 
b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
index cb10906..038b58c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java
@@ -19,12 +19,14 @@
 package org.apache.flink.api.java;
 
 import org.apache.commons.lang3.StringUtils;
+
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -32,7 +34,6 @@ import java.lang.reflect.Modifier;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.flink.configuration.Configuration;
 import static 
org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis;
 
 /**
@@ -63,19 +64,28 @@ public final class Utils {
         *
         * @param typeInfo {@link CompositeType}
         */
-       public static void getContainedGenericTypes(CompositeType typeInfo, 
List<GenericTypeInfo<?>> target) {
-               for(int i = 0; i < typeInfo.getArity(); i++) {
+       public static void getContainedGenericTypes(CompositeType<?> typeInfo, 
List<GenericTypeInfo<?>> target) {
+               for (int i = 0; i < typeInfo.getArity(); i++) {
                        TypeInformation<?> type = typeInfo.getTypeAt(i);
-                       if(type instanceof CompositeType) {
-                               getContainedGenericTypes((CompositeType) type, 
target);
-                       } else if(type instanceof GenericTypeInfo) {
-                               if(!target.contains(type)) {
+                       if (type instanceof CompositeType) {
+                               getContainedGenericTypes((CompositeType<?>) 
type, target);
+                       } else if (type instanceof GenericTypeInfo) {
+                               if (!target.contains(type)) {
                                        target.add((GenericTypeInfo<?>) type);
                                }
                        }
                }
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       
+       /**
+        * Utility sink function that counts elements and writes the count into 
an accumulator,
+        * from which it can be retrieved by the client. This sink is used by 
the
+        * {@link DataSet#count()} function.
+        * 
+        * @param <T> Type of elements to count.
+        */
        @SkipCodeAnalysis
        public static class CountHelper<T> extends RichOutputFormat<T> {
 
@@ -90,24 +100,29 @@ public final class Utils {
                }
 
                @Override
-               public void configure(Configuration parameters) {
-               }
+               public void configure(Configuration parameters) {}
 
                @Override
-               public void open(int taskNumber, int numTasks) throws 
IOException {
-               }
+               public void open(int taskNumber, int numTasks) {}
 
                @Override
-               public void writeRecord(T record) throws IOException {
+               public void writeRecord(T record) {
                        counter++;
                }
 
                @Override
-               public void close() throws IOException {
+               public void close() {
                        getRuntimeContext().getLongCounter(id).add(counter);
                }
        }
 
+       /**
+        * Utility sink function that collects elements into an accumulator,
+        * from which it they can be retrieved by the client. This sink is used 
by the
+        * {@link DataSet#collect()} function.
+        *
+        * @param <T> Type of elements to count.
+        */
        @SkipCodeAnalysis
        public static class CollectHelper<T> extends RichOutputFormat<T> {
 
@@ -124,11 +139,10 @@ public final class Utils {
                }
 
                @Override
-               public void configure(Configuration parameters) {
-               }
+               public void configure(Configuration parameters) {}
 
                @Override
-               public void open(int taskNumber, int numTasks) throws 
IOException {
+               public void open(int taskNumber, int numTasks)  {
                        this.accumulator = new SerializedListAccumulator<>();
                }
 
@@ -138,13 +152,12 @@ public final class Utils {
                }
 
                @Override
-               public void close() throws IOException {
+               public void close() {
                        // Important: should only be added in close method to 
minimize traffic of accumulators
                        getRuntimeContext().addAccumulator(id, accumulator);
                }
        }
 
-
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -157,16 +170,16 @@ public final class Utils {
 
        private static <T> String getSerializerTree(TypeInformation<T> ti, int 
indent) {
                String ret = "";
-               if(ti instanceof CompositeType) {
+               if (ti instanceof CompositeType) {
                        ret += StringUtils.repeat(' ', indent) + 
ti.getClass().getSimpleName()+"\n";
                        CompositeType<T> cti = (CompositeType<T>) ti;
                        String[] fieldNames = cti.getFieldNames();
-                       for(int i = 0; i < cti.getArity(); i++) {
-                               TypeInformation fieldType = cti.getTypeAt(i);
+                       for (int i = 0; i < cti.getArity(); i++) {
+                               TypeInformation<?> fieldType = cti.getTypeAt(i);
                                ret += StringUtils.repeat(' ', indent + 2) + 
fieldNames[i]+":"+getSerializerTree(fieldType, indent);
                        }
                } else {
-                       if(ti instanceof GenericTypeInfo) {
+                       if (ti instanceof GenericTypeInfo) {
                                ret += StringUtils.repeat(' ', indent) + 
"GenericTypeInfo ("+ti.getTypeClass().getSimpleName()+")\n";
                                ret += getGenericTypeTree(ti.getTypeClass(), 
indent + 4);
                        } else {
@@ -176,14 +189,15 @@ public final class Utils {
                return ret;
        }
 
-       private static String getGenericTypeTree(Class type, int indent) {
+       private static String getGenericTypeTree(Class<?> type, int indent) {
                String ret = "";
-               for(Field field : type.getDeclaredFields()) {
-                       if(Modifier.isStatic(field.getModifiers()) || 
Modifier.isTransient(field.getModifiers())) {
+               for (Field field : type.getDeclaredFields()) {
+                       if (Modifier.isStatic(field.getModifiers()) || 
Modifier.isTransient(field.getModifiers())) {
                                continue;
                        }
-                       ret += StringUtils.repeat(' ', indent) + 
field.getName() + ":" + field.getType().getName() + (field.getType().isEnum() ? 
" (is enum)" : "") + "\n";
-                       if(!field.getType().isPrimitive()) {
+                       ret += StringUtils.repeat(' ', indent) + 
field.getName() + ":" + field.getType().getName() + 
+                               (field.getType().isEnum() ? " (is enum)" : "") 
+ "\n";
+                       if (!field.getType().isPrimitive()) {
                                ret += getGenericTypeTree(field.getType(), 
indent + 4);
                        }
                }

Reply via email to