[hotfix] Minor cleanup of warnings, comments, and code style in the Java API Utils
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93654413 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93654413 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93654413 Branch: refs/heads/master Commit: 9365441396efdcf852e9076bdb6ca0fcc841434c Parents: e9a5358 Author: Stephan Ewen <se...@apache.org> Authored: Thu Jan 14 22:08:41 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Jan 15 11:44:21 2016 +0100 ---------------------------------------------------------------------- .../java/org/apache/flink/api/java/Utils.java | 68 ++++++++++++-------- 1 file changed, 41 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/93654413/flink-java/src/main/java/org/apache/flink/api/java/Utils.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java index cb10906..038b58c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/Utils.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/Utils.java @@ -19,12 +19,14 @@ package org.apache.flink.api.java; import org.apache.commons.lang3.StringUtils; + import org.apache.flink.api.common.accumulators.SerializedListAccumulator; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.configuration.Configuration; import java.io.IOException; import java.lang.reflect.Field; @@ -32,7 +34,6 @@ import java.lang.reflect.Modifier; import java.util.List; import java.util.Random; -import org.apache.flink.configuration.Configuration; import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis; /** @@ -63,19 +64,28 @@ public final class Utils { * * @param typeInfo {@link CompositeType} */ - public static void getContainedGenericTypes(CompositeType typeInfo, List<GenericTypeInfo<?>> target) { - for(int i = 0; i < typeInfo.getArity(); i++) { + public static void getContainedGenericTypes(CompositeType<?> typeInfo, List<GenericTypeInfo<?>> target) { + for (int i = 0; i < typeInfo.getArity(); i++) { TypeInformation<?> type = typeInfo.getTypeAt(i); - if(type instanceof CompositeType) { - getContainedGenericTypes((CompositeType) type, target); - } else if(type instanceof GenericTypeInfo) { - if(!target.contains(type)) { + if (type instanceof CompositeType) { + getContainedGenericTypes((CompositeType<?>) type, target); + } else if (type instanceof GenericTypeInfo) { + if (!target.contains(type)) { target.add((GenericTypeInfo<?>) type); } } } } + // -------------------------------------------------------------------------------------------- + + /** + * Utility sink function that counts elements and writes the count into an accumulator, + * from which it can be retrieved by the client. This sink is used by the + * {@link DataSet#count()} function. + * + * @param <T> Type of elements to count. + */ @SkipCodeAnalysis public static class CountHelper<T> extends RichOutputFormat<T> { @@ -90,24 +100,29 @@ public final class Utils { } @Override - public void configure(Configuration parameters) { - } + public void configure(Configuration parameters) {} @Override - public void open(int taskNumber, int numTasks) throws IOException { - } + public void open(int taskNumber, int numTasks) {} @Override - public void writeRecord(T record) throws IOException { + public void writeRecord(T record) { counter++; } @Override - public void close() throws IOException { + public void close() { getRuntimeContext().getLongCounter(id).add(counter); } } + /** + * Utility sink function that collects elements into an accumulator, + * from which it they can be retrieved by the client. This sink is used by the + * {@link DataSet#collect()} function. + * + * @param <T> Type of elements to count. + */ @SkipCodeAnalysis public static class CollectHelper<T> extends RichOutputFormat<T> { @@ -124,11 +139,10 @@ public final class Utils { } @Override - public void configure(Configuration parameters) { - } + public void configure(Configuration parameters) {} @Override - public void open(int taskNumber, int numTasks) throws IOException { + public void open(int taskNumber, int numTasks) { this.accumulator = new SerializedListAccumulator<>(); } @@ -138,13 +152,12 @@ public final class Utils { } @Override - public void close() throws IOException { + public void close() { // Important: should only be added in close method to minimize traffic of accumulators getRuntimeContext().addAccumulator(id, accumulator); } } - // -------------------------------------------------------------------------------------------- /** @@ -157,16 +170,16 @@ public final class Utils { private static <T> String getSerializerTree(TypeInformation<T> ti, int indent) { String ret = ""; - if(ti instanceof CompositeType) { + if (ti instanceof CompositeType) { ret += StringUtils.repeat(' ', indent) + ti.getClass().getSimpleName()+"\n"; CompositeType<T> cti = (CompositeType<T>) ti; String[] fieldNames = cti.getFieldNames(); - for(int i = 0; i < cti.getArity(); i++) { - TypeInformation fieldType = cti.getTypeAt(i); + for (int i = 0; i < cti.getArity(); i++) { + TypeInformation<?> fieldType = cti.getTypeAt(i); ret += StringUtils.repeat(' ', indent + 2) + fieldNames[i]+":"+getSerializerTree(fieldType, indent); } } else { - if(ti instanceof GenericTypeInfo) { + if (ti instanceof GenericTypeInfo) { ret += StringUtils.repeat(' ', indent) + "GenericTypeInfo ("+ti.getTypeClass().getSimpleName()+")\n"; ret += getGenericTypeTree(ti.getTypeClass(), indent + 4); } else { @@ -176,14 +189,15 @@ public final class Utils { return ret; } - private static String getGenericTypeTree(Class type, int indent) { + private static String getGenericTypeTree(Class<?> type, int indent) { String ret = ""; - for(Field field : type.getDeclaredFields()) { - if(Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) { + for (Field field : type.getDeclaredFields()) { + if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) { continue; } - ret += StringUtils.repeat(' ', indent) + field.getName() + ":" + field.getType().getName() + (field.getType().isEnum() ? " (is enum)" : "") + "\n"; - if(!field.getType().isPrimitive()) { + ret += StringUtils.repeat(' ', indent) + field.getName() + ":" + field.getType().getName() + + (field.getType().isEnum() ? " (is enum)" : "") + "\n"; + if (!field.getType().isPrimitive()) { ret += getGenericTypeTree(field.getType(), indent + 4); } }