[FLINK-3466] [tests] Add serialization validation for state handles
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2837c603 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2837c603 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2837c603 Branch: refs/heads/master Commit: 2837c60384e94ff3407802a5dfd3b60aae1cb018 Parents: 41f5818 Author: Stephan Ewen <[email protected]> Authored: Thu Jul 14 15:14:12 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Fri Jul 15 17:18:04 2016 +0200 ---------------------------------------------------------------------- flink-tests/pom.xml | 8 ++ .../state/StateHandleSerializationTest.java | 95 ++++++++++++++++++++ 2 files changed, 103 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2837c603/flink-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml index 77216e0..7ced2d0 100644 --- a/flink-tests/pom.xml +++ b/flink-tests/pom.xml @@ -190,6 +190,14 @@ under the License. <version>${project.version}</version> <scope>test</scope> </dependency> + + <!-- utility to scan classpaths --> + <dependency> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + <version>0.9.10</version> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/2837c603/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java new file mode 100644 index 0000000..c7d9a42 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.state; + +import org.apache.flink.runtime.state.KvStateSnapshot; +import org.apache.flink.runtime.state.StateHandle; + +import org.junit.Test; + +import org.reflections.Reflections; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Set; + +import static org.junit.Assert.*; + +public class StateHandleSerializationTest { + + /** + * This test validates that all subclasses of {@link StateHandle} have a proper + * serial version UID. + */ + @Test + public void ensureStateHandlesHaveSerialVersionUID() { + try { + Reflections reflections = new Reflections("org.apache.flink"); + + // check all state handles + + @SuppressWarnings("unchecked") + Set<Class<?>> stateHandleImplementations = (Set<Class<?>>) (Set<?>) + reflections.getSubTypesOf(StateHandle.class); + + for (Class<?> clazz : stateHandleImplementations) { + validataSerialVersionUID(clazz); + } + + // check all key/value snapshots + + @SuppressWarnings("unchecked") + Set<Class<?>> kvStateSnapshotImplementations = (Set<Class<?>>) (Set<?>) + reflections.getSubTypesOf(KvStateSnapshot.class); + + System.out.println(kvStateSnapshotImplementations); + + for (Class<?> clazz : kvStateSnapshotImplementations) { + validataSerialVersionUID(clazz); + } + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private static void validataSerialVersionUID(Class<?> clazz) { + // all non-interface types must have a serial version UID + if (!clazz.isInterface()) { + assertFalse("Anonymous state handle classes have problematic serialization behavior", + clazz.isAnonymousClass()); + + try { + Field versionUidField = clazz.getDeclaredField("serialVersionUID"); + + // check conditions first via "if" to prevent always constructing expensive error messages + if (!(Modifier.isPrivate(versionUidField.getModifiers()) && + Modifier.isStatic(versionUidField.getModifiers()) && + Modifier.isFinal(versionUidField.getModifiers()))) + { + fail(clazz.getName() + " - serialVersionUID is not 'private static final'"); + } + } + catch (NoSuchFieldException e) { + fail("State handle implementation '" + clazz.getName() + "' is missing the serialVersionUID"); + } + } + } +}
