Repository: flink Updated Branches: refs/heads/master f12c591aa -> fa11845b9
[FLINK-6695] Activate strict checkstyle for flink-streaming-contrib This closes #4004. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f8cacd5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f8cacd5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f8cacd5 Branch: refs/heads/master Commit: 2f8cacd5e5482fdb3e9af94d8490110b9345d986 Parents: 2a2d984 Author: zentol <[email protected]> Authored: Wed May 24 10:51:23 2017 +0200 Committer: zentol <[email protected]> Committed: Thu Jun 1 11:14:11 2017 +0200 ---------------------------------------------------------------------- flink-contrib/flink-streaming-contrib/pom.xml | 35 +++++++++++++++ .../flink/contrib/streaming/CollectSink.java | 18 ++++---- .../contrib/streaming/DataStreamUtils.java | 15 ++++--- .../contrib/streaming/SocketStreamIterator.java | 45 ++++++++++---------- .../flink/contrib/streaming/CollectITCase.java | 14 +++--- .../streaming/SocketStreamIteratorTest.java | 31 ++++++++------ 6 files changed, 100 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/pom.xml b/flink-contrib/flink-streaming-contrib/pom.xml index 2381d70..812043c 100644 --- a/flink-contrib/flink-streaming-contrib/pom.xml +++ b/flink-contrib/flink-streaming-contrib/pom.xml @@ -182,6 +182,41 @@ under the License. <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation> </configuration> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>6.19</version> + </dependency> + </dependencies> + <configuration> + <configLocation>/tools/maven/strict-checkstyle.xml</configLocation> + <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <logViolationsToConsole>true</logViolationsToConsole> + <failOnViolation>true</failOnViolation> + </configuration> + <executions> + <!-- + Execute checkstyle after compilation but before tests. + + This ensures that any parsing or type checking errors are from + javac, so they look as expected. Beyond that, we want to + fail as early as possible. + --> + <execution> + <phase>test-compile</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java index 161eb16..13127fe 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java @@ -17,27 +17,27 @@ package org.apache.flink.contrib.streaming; -import java.io.IOException; -import java.io.OutputStream; -import java.net.Socket; -import java.net.InetAddress; - +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.api.common.typeutils.TypeSerializer; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; /** * A specialized data sink to be used by DataStreamUtils.collect. */ class CollectSink<IN> extends RichSinkFunction<IN> { - + private static final long serialVersionUID = 1L; private final InetAddress hostIp; private final int port; private final TypeSerializer<IN> serializer; - + private transient Socket client; private transient OutputStream outputStream; private transient DataOutputViewStreamWrapper streamWriter; @@ -91,7 +91,7 @@ class CollectSink<IN> extends RichSinkFunction<IN> { outputStream.flush(); outputStream.close(); } - + // first regular attempt to cleanly close. Failing that will escalate if (client != null) { client.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java index 2987597..430c98c 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java @@ -31,6 +31,9 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Iterator; +/** + * A collection of utilities for {@link DataStream DataStreams}. + */ public final class DataStreamUtils { /** @@ -38,19 +41,19 @@ public final class DataStreamUtils { * @return The iterator */ public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) throws IOException { - + TypeSerializer<OUT> serializer = stream.getType().createSerializer( stream.getExecutionEnvironment().getConfig()); - + SocketStreamIterator<OUT> iter = new SocketStreamIterator<OUT>(serializer); //Find out what IP of us should be given to CollectSink, that it will be able to connect to StreamExecutionEnvironment env = stream.getExecutionEnvironment(); InetAddress clientAddress; - + if (env instanceof RemoteStreamEnvironment) { - String host = ((RemoteStreamEnvironment)env).getHost(); - int port = ((RemoteStreamEnvironment)env).getPort(); + String host = ((RemoteStreamEnvironment) env).getHost(); + int port = ((RemoteStreamEnvironment) env).getPort(); try { clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 400); } @@ -73,7 +76,7 @@ public final class DataStreamUtils { sink.setParallelism(1); // It would not work if multiple instances would connect to the same port (new CallExecute(env, iter)).start(); - + return iter; } http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java index c65be85..fddfe4e 100644 --- a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java +++ b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java @@ -17,47 +17,46 @@ package org.apache.flink.contrib.streaming; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; + +import java.io.EOFException; +import java.io.IOException; import java.net.InetAddress; +import java.net.ServerSocket; import java.net.Socket; import java.util.Iterator; -import java.net.ServerSocket; -import java.io.IOException; -import java.io.EOFException; import java.util.NoSuchElementException; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; - /** * An iterator that returns the data from a socket stream. - * + * * <p>The iterator's constructor opens a server socket. In the first call to {@link #next()} * or {@link #hasNext()}, the iterator waits for a socket to connect, and starts receiving, * deserializing, and returning the data from that socket. - * + * * @param <T> The type of elements returned from the iterator. */ class SocketStreamIterator<T> implements Iterator<T> { - /** Server socket to listen at */ + /** Server socket to listen at. */ private final ServerSocket socket; - /** Serializer to deserialize stream */ + /** Serializer to deserialize stream. */ private final TypeSerializer<T> serializer; - /** Set by the same thread that reads it */ + /** Set by the same thread that reads it. */ private DataInputViewStreamWrapper inStream; - /** Next element, handover from hasNext() to next() */ + /** Next element, handover from hasNext() to next(). */ private T next; - /** The socket for the specific stream */ + /** The socket for the specific stream. */ private Socket connectedSocket; - /** Async error, for example by the executor of the program that produces the stream */ + /** Async error, for example by the executor of the program that produces the stream. */ private volatile Throwable error; - SocketStreamIterator(TypeSerializer<T> serializer) throws IOException { this.serializer = serializer; try { @@ -79,18 +78,18 @@ class SocketStreamIterator<T> implements Iterator<T> { public int getPort() { return socket.getLocalPort(); } - + public InetAddress getBindAddress() { return socket.getInetAddress(); } - + public void close() { if (connectedSocket != null) { try { connectedSocket.close(); } catch (Throwable ignored) {} } - + try { socket.close(); } catch (Throwable ignored) {} @@ -114,7 +113,7 @@ class SocketStreamIterator<T> implements Iterator<T> { throw new RuntimeException("Failed to receive next element: " + e.getMessage(), e); } } - + return next != null; } @@ -145,18 +144,18 @@ class SocketStreamIterator<T> implements Iterator<T> { connectedSocket = socket.accept(); inStream = new DataInputViewStreamWrapper(connectedSocket.getInputStream()); } - + return serializer.deserialize(inStream); } catch (EOFException e) { try { connectedSocket.close(); } catch (Throwable ignored) {} - + try { socket.close(); } catch (Throwable ignored) {} - + return null; } catch (Exception e) { @@ -173,7 +172,7 @@ class SocketStreamIterator<T> implements Iterator<T> { // ------------------------------------------------------------------------ // errors // ------------------------------------------------------------------------ - + public void notifyOfError(Throwable error) { if (error != null && this.error == null) { this.error = error; http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java index f9b6a21..55a4df3 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java @@ -29,7 +29,7 @@ import org.junit.Test; import java.util.Iterator; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; /** * This test verifies the behavior of DataStreamUtils.collect. @@ -45,18 +45,18 @@ public class CollectITCase extends TestLogger { TestStreamEnvironment.setAsContext(cluster, 1); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - final long N = 10; - DataStream<Long> stream = env.generateSequence(1, N); - + + final long n = 10; + DataStream<Long> stream = env.generateSequence(1, n); + long i = 1; for (Iterator<Long> it = DataStreamUtils.collect(stream); it.hasNext(); ) { long x = it.next(); assertEquals("received wrong element", i, x); i++; } - - assertEquals("received wrong number of elements", N + 1, i); + + assertEquals("received wrong number of elements", n + 1, i); } finally { TestStreamEnvironment.unsetAsContext(); http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java index f8739a3..0693ce2 100644 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java +++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java @@ -28,31 +28,36 @@ import java.net.Socket; import java.util.Random; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +/** + * Tests for the SocketStreamIterator. + */ public class SocketStreamIteratorTest { - + @Test public void testIterator() throws Exception { - + final AtomicReference<Throwable> error = new AtomicReference<>(); - + final long seed = new Random().nextLong(); final int numElements = 1000; - + final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE); - + Thread writer = new Thread() { - + @Override public void run() { try { try (Socket sock = new Socket(iterator.getBindAddress(), iterator.getPort()); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(sock.getOutputStream())) - { + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(sock.getOutputStream())) { + final TypeSerializer<Long> serializer = LongSerializer.INSTANCE; final Random rnd = new Random(seed); - + for (int i = 0; i < numElements; i++) { serializer.serialize(rnd.nextLong(), out); } @@ -63,16 +68,16 @@ public class SocketStreamIteratorTest { } } }; - + writer.start(); - + final Random validator = new Random(seed); for (int i = 0; i < numElements; i++) { assertTrue(iterator.hasNext()); assertTrue(iterator.hasNext()); assertEquals(validator.nextLong(), iterator.next().longValue()); } - + assertFalse(iterator.hasNext()); writer.join(); assertFalse(iterator.hasNext());
