[FLINK-6432] [py] Activate strict checkstyle for flink-python This closes #3969.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04fae536 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04fae536 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04fae536 Branch: refs/heads/master Commit: 04fae5362bdc933c111c92a9cc3b3b2c1d71850e Parents: ce573c6 Author: zentol <[email protected]> Authored: Tue May 2 18:03:48 2017 +0200 Committer: zentol <[email protected]> Committed: Wed May 24 16:46:03 2017 +0200 ---------------------------------------------------------------------- flink-libraries/flink-python/pom.xml | 35 +++++++++++++++ .../flink/python/api/PythonOperationInfo.java | 14 ++++-- .../apache/flink/python/api/PythonOptions.java | 5 ++- .../flink/python/api/PythonPlanBinder.java | 6 ++- .../python/api/functions/PythonCoGroup.java | 6 ++- .../api/functions/PythonMapPartition.java | 4 +- .../api/functions/util/IdentityGroupReduce.java | 10 +++-- .../python/api/functions/util/KeyDiscarder.java | 7 +-- .../api/functions/util/NestedKeyDiscarder.java | 8 ++-- .../api/functions/util/SerializerMap.java | 7 +-- .../functions/util/StringDeserializerMap.java | 7 +-- .../util/StringTupleDeserializerMap.java | 7 +-- .../streaming/data/PythonDualInputSender.java | 1 + .../streaming/data/PythonDualInputStreamer.java | 1 + .../api/streaming/data/PythonReceiver.java | 13 +++--- .../python/api/streaming/data/PythonSender.java | 19 +++++++-- .../streaming/data/PythonSingleInputSender.java | 1 + .../data/PythonSingleInputStreamer.java | 1 + .../api/streaming/data/PythonStreamer.java | 2 + .../data/SingleElementPushBackIterator.java | 1 + .../api/streaming/plan/PythonPlanReceiver.java | 12 +++--- .../api/streaming/plan/PythonPlanSender.java | 4 +- .../api/streaming/plan/PythonPlanStreamer.java | 10 +++-- .../api/streaming/util/SerializationUtils.java | 45 ++++++++++++++++++++ .../api/streaming/util/StreamPrinter.java | 1 + .../python/api/types/CustomTypeWrapper.java | 1 + .../apache/flink/python/api/util/SetCache.java | 1 + .../flink/python/api/PythonPlanBinderTest.java | 6 ++- .../data/SingleElementPushBackIteratorTest.java | 4 ++ 29 files changed, 189 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/pom.xml b/flink-libraries/flink-python/pom.xml index 6a57c3e..f6cea0c 100644 --- a/flink-libraries/flink-python/pom.xml +++ b/flink-libraries/flink-python/pom.xml @@ -48,6 +48,41 @@ under the License. </archive> </configuration> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>6.19</version> + </dependency> + </dependencies> + <configuration> + <configLocation>/tools/maven/strict-checkstyle.xml</configLocation> + <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <logViolationsToConsole>true</logViolationsToConsole> + <failOnViolation>true</failOnViolation> + </configuration> + <executions> + <!-- + Execute checkstyle after compilation but before tests. + + This ensures that any parsing or type checking errors are from + javac, so they look as expected. Beyond that, we want to + fail as early as possible. + --> + <execution> + <phase>test-compile</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java index 694c1b4..42eeffb 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java @@ -10,17 +10,23 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api; -import java.io.IOException; -import java.util.Arrays; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; -import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer; +import java.io.IOException; +import java.util.Arrays; + +import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject; + +/** + * Generic container for all information required to an operation to the DataSet API. + */ public class PythonOperationInfo { public String identifier; public int parentID; //DataSet that an operation is applied on @@ -121,7 +127,7 @@ public class PythonOperationInfo { return sb.toString(); } - public enum DatasizeHint { + enum DatasizeHint { NONE, TINY, HUGE http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java index de053a0..4137c11 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.python.api; import org.apache.flink.configuration.ConfigOption; @@ -40,8 +41,8 @@ public class PythonOptions { * The config parameter defining the size of the memory-mapped files, in kb. * This value must be large enough to ensure that the largest serialized record can be written completely into * the file. - * - * Every task will allocate 2 memory-files, each with this size. + * + * <p>Every task will allocate 2 memory-files, each with this size. */ public static final ConfigOption<Long> MMAP_FILE_SIZE = key("python.mmap.size.kb") http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index 2378d60..810c8cd 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -10,6 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api; import org.apache.flink.api.common.JobExecutionResult; @@ -44,6 +45,7 @@ import org.apache.flink.python.api.functions.util.StringTupleDeserializerMap; import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer; import org.apache.flink.python.api.util.SetCache; import org.apache.flink.runtime.filecache.FileCache; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,9 +106,9 @@ public class PythonPlanBinder { tmpPlanFilesDir = configuredPlanTmpPath != null ? configuredPlanTmpPath : System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + UUID.randomUUID(); - + tmpDistributedDir = new Path(globalConfig.getString(PythonOptions.DC_TMP_DIR)); - + String flinkRootDir = System.getenv("FLINK_ROOT_DIR"); pythonLibraryPath = flinkRootDir != null //command-line http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java index a5e3e75..eaec88525 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java @@ -10,16 +10,18 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.functions; +import org.apache.flink.api.common.functions.RichCoGroupFunction; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.python.api.streaming.data.PythonDualInputStreamer; import org.apache.flink.util.Collector; + import java.io.IOException; -import org.apache.flink.api.common.functions.RichCoGroupFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; /** * CoGroupFunction that uses a python script. http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java index 207ead9..924538e 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java @@ -10,9 +10,9 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.functions; -import java.io.IOException; import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -21,6 +21,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.python.api.streaming.data.PythonSingleInputStreamer; import org.apache.flink.util.Collector; +import java.io.IOException; + /** * Multi-purpose class, usable by all operations using a python script with one input source and possibly differing * in-/output types. http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java index 32fd22a..6d82e34 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/IdentityGroupReduce.java @@ -10,15 +10,17 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.functions.util; +import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.util.Collector; -import org.apache.flink.api.common.functions.GroupReduceFunction; -/* -Utility function to group and sort data. -*/ +/** + * Utility function to group and sort data. + * @param <IN> input type + */ @ForwardedFields("*->*") public class IdentityGroupReduce<IN> implements GroupReduceFunction<IN, IN> { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java index b2af7be..985eee5 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/KeyDiscarder.java @@ -10,15 +10,16 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.functions.util; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple2; -/* -Utility function to extract the value from a Key-Value Tuple. -*/ +/** + * Utility function to extract the value from a Key-Value Tuple. + */ @ForwardedFields("f1->*") public class KeyDiscarder <T> implements MapFunction<Tuple2<T, byte[]>, byte[]> { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java index 4c8511e..a2c9e52 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/NestedKeyDiscarder.java @@ -10,6 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.functions.util; import org.apache.flink.api.common.functions.MapFunction; @@ -17,9 +18,10 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; -/* -Utility function to extract values from 2 Key-Value Tuples after a DefaultJoin. -*/ +/** + * Utility function to extract values from 2 Key-Value Tuples after a DefaultJoin. + * @param <IN> input type + */ @ForwardedFields("f0.f1->f0; f1.f1->f1") public class NestedKeyDiscarder<IN> implements MapFunction<IN, Tuple2<byte[], byte[]>> { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java index 116efd4..aaa7544 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/SerializerMap.java @@ -10,15 +10,16 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.functions.util; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.python.api.streaming.util.SerializationUtils; import org.apache.flink.python.api.streaming.util.SerializationUtils.Serializer; -/* -Utility function to serialize values, usually directly from data sources. -*/ +/** + * Utility function to serialize values, usually directly from data sources. + */ public class SerializerMap<IN> implements MapFunction<IN, byte[]> { private transient Serializer<IN> serializer; http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java index 3d79b08..c72b4be 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringDeserializerMap.java @@ -10,14 +10,15 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.functions.util; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.configuration.ConfigConstants; -/* -Utility function to deserialize strings, used for non-CSV sinks. -*/ +/** + * Utility function to deserialize strings, used for non-CSV sinks. + */ public class StringDeserializerMap implements MapFunction<byte[], String> { @Override public String map(byte[] value) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java index af5eac6..48082ac 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/util/StringTupleDeserializerMap.java @@ -10,15 +10,16 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.functions.util; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.ConfigConstants; -/* -Utility function to deserialize strings, used for CSV sinks. -*/ +/** + * Utility function to deserialize strings, used for CSV sinks. + */ public class StringTupleDeserializerMap implements MapFunction<byte[], Tuple1<String>> { @Override public Tuple1<String> map(byte[] value) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java index a16f522..de129b1 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.python.api.streaming.data; import org.apache.flink.configuration.Configuration; http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java index 8c9fde9..bc3e546 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.python.api.streaming.data; import org.apache.flink.api.common.functions.AbstractRichFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java index c7c1f7a..d8436c7 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java @@ -10,20 +10,21 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.streaming.data; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.python.api.PythonOptions; +import org.apache.flink.util.Collector; + import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.io.Serializable; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple2; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.python.api.PythonOptions; -import org.apache.flink.util.Collector; /** * This class is used to read data from memory-mapped files. http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java index 3d13271..2739fb1 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java @@ -10,6 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.streaming.data; import org.apache.flink.api.java.tuple.Tuple; @@ -41,7 +42,7 @@ public abstract class PythonSender implements Serializable { private transient MappedByteBuffer fileBuffer; private final long mappedFileSizeBytes; - + private final Configuration config; protected PythonSender(Configuration config) { @@ -59,7 +60,6 @@ public abstract class PythonSender implements Serializable { outputFile.createNewFile(); outputRAF = new RandomAccessFile(outputFile, "rw"); - outputRAF.setLength(mappedFileSizeBytes); outputRAF.seek(mappedFileSizeBytes - 1); outputRAF.writeByte(0); @@ -125,16 +125,29 @@ public abstract class PythonSender implements Serializable { throw new IllegalArgumentException("This object can't be serialized: " + value); } + /** + * Interface for all serializers used by {@link PythonSender} classes to write container objects. + * + * <p>These serializers must be kept in sync with the python counterparts. + * + * @param <T> input type + */ protected abstract static class Serializer<T> { protected ByteBuffer buffer; + /** + * Serializes the given value into a {@link ByteBuffer}. + * + * @param value value to serialize + * @return ByteBuffer containing serialized record + */ public ByteBuffer serialize(T value) { serializeInternal(value); buffer.flip(); return buffer; } - public abstract void serializeInternal(T value); + protected abstract void serializeInternal(T value); } private static class ArraySerializer extends Serializer<byte[]> { http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java index 74d0604..af093f1 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.python.api.streaming.data; import org.apache.flink.configuration.Configuration; http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java index 6c0a13c..d596c39 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.python.api.streaming.data; import org.apache.flink.api.common.functions.AbstractRichFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index cc4ba43..3fec947 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -10,6 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.streaming.data; import org.apache.flink.api.common.functions.AbstractRichFunction; @@ -20,6 +21,7 @@ import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializ import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer; import org.apache.flink.python.api.streaming.util.StreamPrinter; import org.apache.flink.util.ExceptionUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java index ef80c98..ea10b1d 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.python.api.streaming.data; import org.apache.flink.util.Preconditions; http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java index 6276302..a90f581 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java @@ -10,12 +10,17 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.streaming.plan; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.python.api.types.CustomTypeWrapper; + import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; -import org.apache.flink.api.java.tuple.Tuple; + import static org.apache.flink.python.api.streaming.data.PythonReceiver.createTuple; import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_BOOLEAN; import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_BYTE; @@ -27,9 +32,6 @@ import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_NULL; import static org.apache.flink.python.api.streaming.util.SerializationUtils.TYPE_STRING; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.python.api.types.CustomTypeWrapper; - /** * Instances of this class can be used to receive data from the plan process. */ @@ -82,7 +84,7 @@ public class PythonPlanReceiver { } private abstract static class Deserializer<T> { - + public T deserialize() throws IOException { return deserialize(false); } http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java index 331c67e..db78b8c 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java @@ -10,12 +10,14 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.streaming.plan; +import org.apache.flink.python.api.streaming.util.SerializationUtils; + import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; -import org.apache.flink.python.api.streaming.util.SerializationUtils; /** * Instances of this class can be used to send data to the plan process. http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java index 9e93dda..d25f3d5 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java @@ -10,12 +10,14 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.streaming.plan; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.python.api.PythonOptions; import org.apache.flink.python.api.streaming.util.StreamPrinter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +42,7 @@ public class PythonPlanStreamer { private Process process; private ServerSocket server; private Socket socket; - + public PythonPlanStreamer(Configuration config) { this.config = config; } @@ -90,7 +92,7 @@ public class PythonPlanStreamer { return false; } while (true) { - try { + try { socket = server.accept(); sender = new PythonPlanSender(socket.getOutputStream()); receiver = new PythonPlanReceiver(socket.getInputStream()); @@ -107,7 +109,7 @@ public class PythonPlanStreamer { } } } - + public void finishPlanMode() { try { socket.close(); @@ -143,7 +145,7 @@ public class PythonPlanStreamer { return ProcessState.RUNNING; } } - + private enum ProcessState { RUNNING, FAILED, http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java index 721746a..1fe3ba1 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/SerializationUtils.java @@ -10,6 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.streaming.util; import org.apache.flink.api.java.tuple.Tuple; @@ -19,6 +20,9 @@ import org.apache.flink.python.api.types.CustomTypeWrapper; import java.nio.ByteBuffer; import java.util.ArrayList; +/** + * Utility class containing serializers for all supported types. + */ public class SerializationUtils { public static final byte TYPE_BOOLEAN = 34; public static final byte TYPE_BYTE = 33; @@ -85,6 +89,14 @@ public class SerializationUtils { return (Serializer<IN>) serializer; } + /** + * Super class for all serializers used to serialize data. These serializers are used to serialize values emitted + * from java input formats. + * + * <p>These serializer smut be kept in sync with the python counterparts. + * + * @param <IN> input type + */ public abstract static class Serializer<IN> { private byte[] typeInfo = null; @@ -109,6 +121,9 @@ public class SerializationUtils { } } + /** + * A serializer for {@link CustomTypeWrapper CustomTypeWrappers}. + */ public static class CustomTypeWrapperSerializer extends Serializer<CustomTypeWrapper> { private final byte type; @@ -129,6 +144,9 @@ public class SerializationUtils { } } + /** + * A serializer for bytes. + */ public static class ByteSerializer extends Serializer<Byte> { @Override public byte[] serializeWithoutTypeInfo(Byte value) { @@ -141,6 +159,9 @@ public class SerializationUtils { } } + /** + * A serializer for booleans. + */ public static class BooleanSerializer extends Serializer<Boolean> { @Override public byte[] serializeWithoutTypeInfo(Boolean value) { @@ -153,6 +174,9 @@ public class SerializationUtils { } } + /** + * A serializer for ints. + */ public static class IntSerializer extends Serializer<Integer> { @Override public byte[] serializeWithoutTypeInfo(Integer value) { @@ -167,6 +191,9 @@ public class SerializationUtils { } } + /** + * A serializer for longs. + */ public static class LongSerializer extends Serializer<Long> { @Override public byte[] serializeWithoutTypeInfo(Long value) { @@ -181,6 +208,9 @@ public class SerializationUtils { } } + /** + * A serializer for strings. + */ public static class StringSerializer extends Serializer<String> { @Override public byte[] serializeWithoutTypeInfo(String value) { @@ -196,6 +226,9 @@ public class SerializationUtils { } } + /** + * A serializer for floats. + */ public static class FloatSerializer extends Serializer<Float> { @Override public byte[] serializeWithoutTypeInfo(Float value) { @@ -210,6 +243,9 @@ public class SerializationUtils { } } + /** + * A serializer for doubles. + */ public static class DoubleSerializer extends Serializer<Double> { @Override public byte[] serializeWithoutTypeInfo(Double value) { @@ -224,6 +260,9 @@ public class SerializationUtils { } } + /** + * A serializer for null. + */ public static class NullSerializer extends Serializer<Object> { @Override public byte[] serializeWithoutTypeInfo(Object value) { @@ -236,6 +275,9 @@ public class SerializationUtils { } } + /** + * A serializer for byte arrays. + */ public static class BytesSerializer extends Serializer<byte[]> { @Override public byte[] serializeWithoutTypeInfo(byte[] value) { @@ -250,6 +292,9 @@ public class SerializationUtils { } } + /** + * A serializer for tuples. + */ public static class TupleSerializer extends Serializer<Tuple> { private final Serializer<Object>[] serializer; http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java index c6a1ede..313b983 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java @@ -10,6 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.streaming.util; import org.apache.flink.configuration.ConfigConstants; http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java index e16c3eb..f5674d5 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/types/CustomTypeWrapper.java @@ -10,6 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api.types; /** http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java index 750ba63..48011fc 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/util/SetCache.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.python.api.util; import org.apache.flink.api.java.DataSet; http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java index 20f3503..701ac73 100644 --- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java +++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java @@ -10,6 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.python.api; import org.apache.flink.configuration.Configuration; @@ -23,8 +24,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +/** + * Tests for the PythonPlanBinder. + */ public class PythonPlanBinderTest extends JavaProgramTestBase { - + @Override protected boolean skipCollectionExecution() { return true; http://git-wip-us.apache.org/repos/asf/flink/blob/04fae536/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java index 5e9eb42..a41aac3 100644 --- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java +++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.python.api.streaming.data; import org.junit.Assert; @@ -24,6 +25,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +/** + * Tests for the SingleElementPushBackIterator. + */ public class SingleElementPushBackIteratorTest { @Test
