Repository: flink
Updated Branches:
  refs/heads/master f12c591aa -> fa11845b9


[FLINK-6695] Activate strict checkstyle for flink-streaming-contrib

This closes #4004.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f8cacd5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f8cacd5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f8cacd5

Branch: refs/heads/master
Commit: 2f8cacd5e5482fdb3e9af94d8490110b9345d986
Parents: 2a2d984
Author: zentol <[email protected]>
Authored: Wed May 24 10:51:23 2017 +0200
Committer: zentol <[email protected]>
Committed: Thu Jun 1 11:14:11 2017 +0200

----------------------------------------------------------------------
 flink-contrib/flink-streaming-contrib/pom.xml   | 35 +++++++++++++++
 .../flink/contrib/streaming/CollectSink.java    | 18 ++++----
 .../contrib/streaming/DataStreamUtils.java      | 15 ++++---
 .../contrib/streaming/SocketStreamIterator.java | 45 ++++++++++----------
 .../flink/contrib/streaming/CollectITCase.java  | 14 +++---
 .../streaming/SocketStreamIteratorTest.java     | 31 ++++++++------
 6 files changed, 100 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/pom.xml 
b/flink-contrib/flink-streaming-contrib/pom.xml
index 2381d70..812043c 100644
--- a/flink-contrib/flink-streaming-contrib/pom.xml
+++ b/flink-contrib/flink-streaming-contrib/pom.xml
@@ -182,6 +182,41 @@ under the License.
                                        
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
                                </configuration>
                        </plugin>
+
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-checkstyle-plugin</artifactId>
+                               <version>2.17</version>
+                               <dependencies>
+                                       <dependency>
+                                               
<groupId>com.puppycrawl.tools</groupId>
+                                               
<artifactId>checkstyle</artifactId>
+                                               <version>6.19</version>
+                                       </dependency>
+                               </dependencies>
+                               <configuration>
+                                       
<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+                                       
<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+                                       
<includeTestSourceDirectory>true</includeTestSourceDirectory>
+                                       
<logViolationsToConsole>true</logViolationsToConsole>
+                                       <failOnViolation>true</failOnViolation>
+                               </configuration>
+                               <executions>
+                                       <!--
+                                       Execute checkstyle after compilation 
but before tests.
+
+                                       This ensures that any parsing or type 
checking errors are from
+                                       javac, so they look as expected. Beyond 
that, we want to
+                                       fail as early as possible.
+                                       -->
+                                       <execution>
+                                               <phase>test-compile</phase>
+                                               <goals>
+                                                       <goal>check</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
index 161eb16..13127fe 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/CollectSink.java
@@ -17,27 +17,27 @@
 
 package org.apache.flink.contrib.streaming;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.net.InetAddress;
-
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
 
 /**
  * A specialized data sink to be used by DataStreamUtils.collect.
  */
 class CollectSink<IN> extends RichSinkFunction<IN> {
-       
+
        private static final long serialVersionUID = 1L;
 
        private final InetAddress hostIp;
        private final int port;
        private final TypeSerializer<IN> serializer;
-       
+
        private transient Socket client;
        private transient OutputStream outputStream;
        private transient DataOutputViewStreamWrapper streamWriter;
@@ -91,7 +91,7 @@ class CollectSink<IN> extends RichSinkFunction<IN> {
                                outputStream.flush();
                                outputStream.close();
                        }
-                       
+
                        // first regular attempt to cleanly close. Failing that 
will escalate
                        if (client != null) {
                                client.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
index 2987597..430c98c 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
@@ -31,6 +31,9 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.Iterator;
 
+/**
+ * A collection of utilities for {@link DataStream DataStreams}.
+ */
 public final class DataStreamUtils {
 
        /**
@@ -38,19 +41,19 @@ public final class DataStreamUtils {
         * @return The iterator
         */
        public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) 
throws IOException {
-               
+
                TypeSerializer<OUT> serializer = 
stream.getType().createSerializer(
                                stream.getExecutionEnvironment().getConfig());
-               
+
                SocketStreamIterator<OUT> iter = new 
SocketStreamIterator<OUT>(serializer);
 
                //Find out what IP of us should be given to CollectSink, that 
it will be able to connect to
                StreamExecutionEnvironment env = 
stream.getExecutionEnvironment();
                InetAddress clientAddress;
-               
+
                if (env instanceof RemoteStreamEnvironment) {
-                       String host = ((RemoteStreamEnvironment)env).getHost();
-                       int port = ((RemoteStreamEnvironment)env).getPort();
+                       String host = ((RemoteStreamEnvironment) env).getHost();
+                       int port = ((RemoteStreamEnvironment) env).getPort();
                        try {
                                clientAddress = 
ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 
400);
                        }
@@ -73,7 +76,7 @@ public final class DataStreamUtils {
                sink.setParallelism(1); // It would not work if multiple 
instances would connect to the same port
 
                (new CallExecute(env, iter)).start();
-               
+
                return iter;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
index c65be85..fddfe4e 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/SocketStreamIterator.java
@@ -17,47 +17,46 @@
 
 package org.apache.flink.contrib.streaming;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+
+import java.io.EOFException;
+import java.io.IOException;
 import java.net.InetAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
 import java.util.Iterator;
-import java.net.ServerSocket;
-import java.io.IOException;
-import java.io.EOFException;
 import java.util.NoSuchElementException;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-
 /**
  * An iterator that returns the data from a socket stream.
- * 
+ *
  * <p>The iterator's constructor opens a server socket. In the first call to 
{@link #next()}
  * or {@link #hasNext()}, the iterator waits for a socket to connect, and 
starts receiving,
  * deserializing, and returning the data from that socket.
- * 
+ *
  * @param <T> The type of elements returned from the iterator.
  */
 class SocketStreamIterator<T> implements Iterator<T> {
 
-       /** Server socket to listen at */
+       /** Server socket to listen at. */
        private final ServerSocket socket;
 
-       /** Serializer to deserialize stream */
+       /** Serializer to deserialize stream. */
        private final TypeSerializer<T> serializer;
 
-       /** Set by the same thread that reads it */
+       /** Set by the same thread that reads it. */
        private DataInputViewStreamWrapper inStream;
 
-       /** Next element, handover from hasNext() to next() */
+       /** Next element, handover from hasNext() to next(). */
        private T next;
 
-       /** The socket for the specific stream */
+       /** The socket for the specific stream. */
        private Socket connectedSocket;
 
-       /** Async error, for example by the executor of the program that 
produces the stream */
+       /** Async error, for example by the executor of the program that 
produces the stream. */
        private volatile Throwable error;
 
-
        SocketStreamIterator(TypeSerializer<T> serializer) throws IOException {
                this.serializer = serializer;
                try {
@@ -79,18 +78,18 @@ class SocketStreamIterator<T> implements Iterator<T> {
        public int getPort() {
                return socket.getLocalPort();
        }
-       
+
        public InetAddress getBindAddress() {
                return socket.getInetAddress();
        }
-       
+
        public void close() {
                if (connectedSocket != null) {
                        try {
                                connectedSocket.close();
                        } catch (Throwable ignored) {}
                }
-               
+
                try {
                        socket.close();
                } catch (Throwable ignored) {}
@@ -114,7 +113,7 @@ class SocketStreamIterator<T> implements Iterator<T> {
                                throw new RuntimeException("Failed to receive 
next element: " + e.getMessage(), e);
                        }
                }
-               
+
                return next != null;
        }
 
@@ -145,18 +144,18 @@ class SocketStreamIterator<T> implements Iterator<T> {
                                connectedSocket = socket.accept();
                                inStream = new 
DataInputViewStreamWrapper(connectedSocket.getInputStream());
                        }
-                       
+
                        return serializer.deserialize(inStream);
                }
                catch (EOFException e) {
                        try {
                                connectedSocket.close();
                        } catch (Throwable ignored) {}
-                       
+
                        try {
                                socket.close();
                        } catch (Throwable ignored) {}
-                       
+
                        return null;
                }
                catch (Exception e) {
@@ -173,7 +172,7 @@ class SocketStreamIterator<T> implements Iterator<T> {
        // 
------------------------------------------------------------------------
        //  errors
        // 
------------------------------------------------------------------------
-       
+
        public void notifyOfError(Throwable error) {
                if (error != null && this.error == null) {
                        this.error = error;

http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
index f9b6a21..55a4df3 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
@@ -29,7 +29,7 @@ import org.junit.Test;
 
 import java.util.Iterator;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 /**
  * This test verifies the behavior of DataStreamUtils.collect.
@@ -45,18 +45,18 @@ public class CollectITCase extends TestLogger {
                        TestStreamEnvironment.setAsContext(cluster, 1);
 
                        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-       
-                       final long N = 10;
-                       DataStream<Long> stream = env.generateSequence(1, N);
-       
+
+                       final long n = 10;
+                       DataStream<Long> stream = env.generateSequence(1, n);
+
                        long i = 1;
                        for (Iterator<Long> it = 
DataStreamUtils.collect(stream); it.hasNext(); ) {
                                long x = it.next();
                                assertEquals("received wrong element", i, x);
                                i++;
                        }
-                       
-                       assertEquals("received wrong number of elements", N + 
1, i);
+
+                       assertEquals("received wrong number of elements", n + 
1, i);
                }
                finally {
                        TestStreamEnvironment.unsetAsContext();

http://git-wip-us.apache.org/repos/asf/flink/blob/2f8cacd5/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
index f8739a3..0693ce2 100644
--- 
a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
+++ 
b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/SocketStreamIteratorTest.java
@@ -28,31 +28,36 @@ import java.net.Socket;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the SocketStreamIterator.
+ */
 public class SocketStreamIteratorTest {
-       
+
        @Test
        public void testIterator() throws Exception {
-               
+
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
-               
+
                final long seed = new Random().nextLong();
                final int numElements = 1000;
-               
+
                final SocketStreamIterator<Long> iterator = new 
SocketStreamIterator<>(LongSerializer.INSTANCE);
-               
+
                Thread writer = new Thread() {
-                       
+
                        @Override
                        public void run() {
                                try {
                                        try (Socket sock = new 
Socket(iterator.getBindAddress(), iterator.getPort());
-                                               DataOutputViewStreamWrapper out 
= new DataOutputViewStreamWrapper(sock.getOutputStream()))
-                                       {
+                                               DataOutputViewStreamWrapper out 
= new DataOutputViewStreamWrapper(sock.getOutputStream())) {
+
                                                final TypeSerializer<Long> 
serializer = LongSerializer.INSTANCE;
                                                final Random rnd = new 
Random(seed);
-                                               
+
                                                for (int i = 0; i < 
numElements; i++) {
                                                        
serializer.serialize(rnd.nextLong(), out);
                                                }
@@ -63,16 +68,16 @@ public class SocketStreamIteratorTest {
                                }
                        }
                };
-               
+
                writer.start();
-               
+
                final Random validator = new Random(seed);
                for (int i = 0; i < numElements; i++) {
                        assertTrue(iterator.hasNext());
                        assertTrue(iterator.hasNext());
                        assertEquals(validator.nextLong(), 
iterator.next().longValue());
                }
-               
+
                assertFalse(iterator.hasNext());
                writer.join();
                assertFalse(iterator.hasNext());

Reply via email to