[FLINK-6803] [tests] Enhancements to PojoSerializerUpgradeTest

1. Allow tests to ignore missing fields.
2. Add equivalent tests which use POJOs as managed operator state.

For 2, all tests have to be ignored for now until FLINK-6804 is fixed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c157d62
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c157d62
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c157d62

Branch: refs/heads/master
Commit: 7c157d624e38513055662491b3b13b4ceb7d3001
Parents: 61a45e4
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Sun Jun 4 19:32:53 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jun 13 06:38:17 2017 +0200

----------------------------------------------------------------------
 .../PojoSerializerUpgradeTest.java              | 210 +++++++++++++++----
 1 file changed, 164 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c157d62/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
index 2769c50..a925d43 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -46,6 +47,7 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.DynamicCodeLoadingException;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
@@ -169,21 +171,30 @@ public class PojoSerializerUpgradeTest extends TestLogger 
{
                "@Override public String toString() {return \"(\" + a + 
\")\";}}";
 
        /**
-        * We should be able to handle a changed field order
+        * We should be able to handle a changed field order of a POJO as keyed 
state
         */
        @Test
-       public void testChangedFieldOrder() throws Exception {
-               testPojoSerializerUpgrade(SOURCE_A, SOURCE_B);
+       public void testChangedFieldOrderWithKeyedState() throws Exception {
+               testPojoSerializerUpgrade(SOURCE_A, SOURCE_B, true, true);
        }
 
        /**
-        * Changing field types should require a state migration
+        * We should be able to handle a changed field order of a POJO as 
operator state
         */
+       @Ignore("Ignore this test until FLINK-6804 has been fixed.")
        @Test
-       public void testChangedFieldTypes() throws Exception {
+       public void testChangedFieldOrderWithOperatorState() throws Exception {
+               testPojoSerializerUpgrade(SOURCE_A, SOURCE_B, true, false);
+       }
+
+       /**
+        * Changing field types of a POJO as keyed state should require a state 
migration
+        */
+       @Test
+       public void testChangedFieldTypesWithKeyedState() throws Exception {
                assumeTrue("Running only for RocksDBStateBackend until 
FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
                try {
-                       testPojoSerializerUpgrade(SOURCE_A, SOURCE_C);
+                       testPojoSerializerUpgrade(SOURCE_A, SOURCE_C, true, 
true);
                        fail("Expected a state migration exception.");
                } catch (Exception e) {
                        if (CommonTestUtils.containsCause(e, 
StateMigrationException.class)) {
@@ -195,13 +206,31 @@ public class PojoSerializerUpgradeTest extends TestLogger 
{
        }
 
        /**
-        * Adding fields should require a state migration
+        * Changing field types of a POJO as operator state should require a 
state migration
         */
+       @Ignore("Ignore this test until FLINK-6804 has been fixed.")
        @Test
-       public void testAdditionalField() throws Exception {
+       public void testChangedFieldTypesWithOperatorState() throws Exception {
+               try {
+                       testPojoSerializerUpgrade(SOURCE_A, SOURCE_C, true, 
false);
+                       fail("Expected a state migration exception.");
+               } catch (Exception e) {
+                       if (CommonTestUtils.containsCause(e, 
StateMigrationException.class)) {
+                               // StateMigrationException expected
+                       } else {
+                               throw e;
+                       }
+               }
+       }
+
+       /**
+        * Adding fields to a POJO as keyed state should require a state 
migration
+        */
+       @Test
+       public void testAdditionalFieldWithKeyedState() throws Exception {
                assumeTrue("Running only for RocksDBStateBackend until 
FLINK-6804 has been fixed.", stateBackend instanceof RocksDBStateBackend);
                try {
-                       testPojoSerializerUpgrade(SOURCE_A, SOURCE_D);
+                       testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, 
true);
                        fail("Expected a state migration exception.");
                } catch (Exception e) {
                        if (CommonTestUtils.containsCause(e, 
StateMigrationException.class)) {
@@ -213,13 +242,49 @@ public class PojoSerializerUpgradeTest extends TestLogger 
{
        }
 
        /**
-        * Removing fields should require a state migration
+        * Adding fields to a POJO as operator state should require a state 
migration
+        */
+       @Ignore("Ignore this test until FLINK-6804 has been fixed.")
+       @Test
+       public void testAdditionalFieldWithOperatorState() throws Exception {
+               try {
+                       testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, 
false);
+                       fail("Expected a state migration exception.");
+               } catch (Exception e) {
+                       if (CommonTestUtils.containsCause(e, 
StateMigrationException.class)) {
+                               // StateMigrationException expected
+                       } else {
+                               throw e;
+                       }
+               }
+       }
+
+       /**
+        * Removing fields from a POJO as keyed state should require a state 
migration
         */
        @Ignore("Ignore this test until FLINK-6801 has been fixed.")
        @Test
-       public void testMissingField() throws Exception {
+       public void testMissingFieldWithKeyedState() throws Exception {
+               try {
+                       testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, 
true);
+                       fail("Expected a state migration exception.");
+               } catch (Exception e) {
+                       if (CommonTestUtils.containsCause(e, 
StateMigrationException.class)) {
+                               // StateMigrationException expected
+                       } else {
+                               throw e;
+                       }
+               }
+       }
+
+       /**
+        * Removing fields from a POJO as operator state should require a state 
migration
+        */
+       @Ignore("Ignore this test until FLINK-6804 has been fixed.")
+       @Test
+       public void testMissingFieldWithOperatorState() throws Exception {
                try {
-                       testPojoSerializerUpgrade(SOURCE_A, SOURCE_E);
+                       testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, 
false);
                        fail("Expected a state migration exception.");
                } catch (Exception e) {
                        if (CommonTestUtils.containsCause(e, 
StateMigrationException.class)) {
@@ -230,7 +295,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
                }
        }
 
-       public void testPojoSerializerUpgrade(String classSourceA, String 
classSourceB) throws Exception {
+       public void testPojoSerializerUpgrade(String classSourceA, String 
classSourceB, boolean hasBField, boolean isKeyedState) throws Exception {
                final Configuration taskConfiguration = new Configuration();
                final ExecutionConfig executionConfig = new ExecutionConfig();
                final KeySelector<Long, Long> keySelector = new 
IdentityKeySelector<>();
@@ -248,8 +313,9 @@ public class PojoSerializerUpgradeTest extends TestLogger {
                OperatorStateHandles stateHandles = runOperator(
                        taskConfiguration,
                        executionConfig,
-                       new StreamMap<>(new StatefulMapper(true, false)),
+                       new StreamMap<>(new StatefulMapper(isKeyedState, false, 
hasBField)),
                        keySelector,
+                       isKeyedState,
                        stateBackend,
                        classLoader,
                        null,
@@ -268,8 +334,9 @@ public class PojoSerializerUpgradeTest extends TestLogger {
                runOperator(
                        taskConfiguration,
                        executionConfig,
-                       new StreamMap<>(new StatefulMapper(true, true)),
+                       new StreamMap<>(new StatefulMapper(isKeyedState, true, 
hasBField)),
                        keySelector,
+                       isKeyedState,
                        stateBackend,
                        classLoaderB,
                        stateHandles,
@@ -281,6 +348,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
                        ExecutionConfig executionConfig,
                        OneInputStreamOperator<Long, Long> operator,
                        KeySelector<Long, Long> keySelector,
+                       boolean isKeyedState,
                        StateBackend stateBackend,
                        ClassLoader classLoader,
                        OperatorStateHandles operatorStateHandles,
@@ -298,11 +366,17 @@ public class PojoSerializerUpgradeTest extends TestLogger 
{
                        0,
                        classLoader);
 
-               final KeyedOneInputStreamOperatorTestHarness<Long, Long, Long> 
harness = new KeyedOneInputStreamOperatorTestHarness<>(
-                       operator,
-                       keySelector,
-                       BasicTypeInfo.LONG_TYPE_INFO,
-                       environment);
+               OneInputStreamOperatorTestHarness<Long, Long> harness;
+
+               if (isKeyedState) {
+                       harness = new KeyedOneInputStreamOperatorTestHarness<>(
+                               operator,
+                               keySelector,
+                               BasicTypeInfo.LONG_TYPE_INFO,
+                               environment);
+               } else {
+                       harness = new 
OneInputStreamOperatorTestHarness<>(operator, LongSerializer.INSTANCE, 
environment);
+               }
 
                harness.setStateBackend(stateBackend);
 
@@ -350,18 +424,26 @@ public class PojoSerializerUpgradeTest extends TestLogger 
{
 
                private final boolean keyed;
                private final boolean verify;
+               private final boolean hasBField;
+
+               // keyed states
+               private transient ValueState<Object> keyedValueState;
+               private transient MapState<Object, Object> keyedMapState;
+               private transient ListState<Object> keyedListState;
+               private transient ReducingState<Object> keyedReducingState;
+
+               // operator states
+               private transient ListState<Object> partitionableListState;
+               private transient ListState<Object> unionListState;
 
-               private transient ValueState<Object> valueState;
-               private transient MapState<Object, Object> mapState;
-               private transient ListState<Object> listState;
-               private transient ReducingState<Object> reducingState;
                private transient Class<?> pojoClass;
                private transient Field fieldA;
                private transient Field fieldB;
 
-               public StatefulMapper(boolean keyed, boolean verify) {
+               public StatefulMapper(boolean keyed, boolean verify, boolean 
hasBField) {
                        this.keyed = keyed;
                        this.verify = verify;
+                       this.hasBField = hasBField;
                }
 
                @Override
@@ -369,30 +451,54 @@ public class PojoSerializerUpgradeTest extends TestLogger 
{
                        Object pojo = pojoClass.newInstance();
 
                        fieldA.set(pojo, value);
-                       fieldB.set(pojo, value + "");
+
+                       if (hasBField) {
+                               fieldB.set(pojo, value + "");
+                       }
 
                        if (verify) {
-                               assertEquals(pojo, valueState.value());
+                               if (keyed) {
+                                       assertEquals(pojo, 
keyedValueState.value());
 
-                               assertTrue(mapState.contains(pojo));
-                               assertEquals(pojo, mapState.get(pojo));
+                                       
assertTrue(keyedMapState.contains(pojo));
+                                       assertEquals(pojo, 
keyedMapState.get(pojo));
 
-                               Iterator<Object> listIterator = 
listState.get().iterator();
+                                       Iterator<Object> listIterator = 
keyedListState.get().iterator();
 
-                               boolean elementFound = false;
+                                       boolean elementFound = false;
 
-                               while(listIterator.hasNext()) {
-                                       elementFound |= 
pojo.equals(listIterator.next());
-                               }
+                                       while (listIterator.hasNext()) {
+                                               elementFound |= 
pojo.equals(listIterator.next());
+                                       }
+
+                                       assertTrue(elementFound);
 
-                               assertTrue(elementFound);
+                                       assertEquals(pojo, 
keyedReducingState.get());
+                               } else {
+                                       boolean elementFound = false;
+                                       Iterator<Object> listIterator = 
partitionableListState.get().iterator();
+                                       while (listIterator.hasNext()) {
+                                               elementFound |= 
pojo.equals(listIterator.next());
+                                       }
+                                       assertTrue(elementFound);
 
-                               assertEquals(pojo, reducingState.get());
+                                       elementFound = false;
+                                       listIterator = 
unionListState.get().iterator();
+                                       while (listIterator.hasNext()) {
+                                               elementFound |= 
pojo.equals(listIterator.next());
+                                       }
+                                       assertTrue(elementFound);
+                               }
                        } else {
-                               valueState.update(pojo);
-                               mapState.put(pojo, pojo);
-                               listState.add(pojo);
-                               reducingState.add(pojo);
+                               if (keyed) {
+                                       keyedValueState.update(pojo);
+                                       keyedMapState.put(pojo, pojo);
+                                       keyedListState.add(pojo);
+                                       keyedReducingState.add(pojo);
+                               } else {
+                                       partitionableListState.add(pojo);
+                                       unionListState.add(pojo);
+                               }
                        }
 
                        return value;
@@ -408,17 +514,29 @@ public class PojoSerializerUpgradeTest extends TestLogger 
{
                        pojoClass = 
getRuntimeContext().getUserCodeClassLoader().loadClass(POJO_NAME);
 
                        fieldA = pojoClass.getDeclaredField("a");
-                       fieldB = pojoClass.getDeclaredField("b");
                        fieldA.setAccessible(true);
-                       fieldB.setAccessible(true);
+
+                       if (hasBField) {
+                               fieldB = pojoClass.getDeclaredField("b");
+                               fieldB.setAccessible(true);
+                       }
 
                        if (keyed) {
-                               valueState = 
context.getKeyedStateStore().getState(new ValueStateDescriptor<>("valueState", 
(Class<Object>) pojoClass));
-                               mapState = 
context.getKeyedStateStore().getMapState(new MapStateDescriptor<>("mapState", 
(Class<Object>) pojoClass, (Class<Object>) pojoClass));
-                               listState = 
context.getKeyedStateStore().getListState(new 
ListStateDescriptor<>("listState", (Class<Object>) pojoClass));
+                               keyedValueState = 
context.getKeyedStateStore().getState(
+                                       new 
ValueStateDescriptor<>("keyedValueState", (Class<Object>) pojoClass));
+                               keyedMapState = 
context.getKeyedStateStore().getMapState(
+                                       new 
MapStateDescriptor<>("keyedMapState", (Class<Object>) pojoClass, 
(Class<Object>) pojoClass));
+                               keyedListState = 
context.getKeyedStateStore().getListState(
+                                       new 
ListStateDescriptor<>("keyedListState", (Class<Object>) pojoClass));
 
                                ReduceFunction<Object> reduceFunction = new 
FirstValueReducer<>();
-                               reducingState = 
context.getKeyedStateStore().getReducingState(new 
ReducingStateDescriptor<>("reducingState", reduceFunction, (Class<Object>) 
pojoClass));
+                               keyedReducingState = 
context.getKeyedStateStore().getReducingState(
+                                       new 
ReducingStateDescriptor<>("keyedReducingState", reduceFunction, (Class<Object>) 
pojoClass));
+                       } else {
+                               partitionableListState = 
context.getOperatorStateStore().getListState(
+                                       new 
ListStateDescriptor<>("partitionableListState", (Class<Object>) pojoClass));
+                               unionListState = 
context.getOperatorStateStore().getUnionListState(
+                                       new 
ListStateDescriptor<>("unionListState", (Class<Object>) pojoClass));
                        }
                }
        }

Reply via email to