Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5032#discussion_r152454242 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.java --- @@ -270,6 +271,20 @@ public void testMapStateReturnsEmptyMapByDefault() throws Exception { assertFalse(value.iterator().hasNext()); } + @Test(expected = DuplicateStateNameException.class) + public void testDuplicateStateName() throws Exception { + StreamingRuntimeContext context = new StreamingRuntimeContext( + createMapPlainMockOp(), + createMockEnvironment(), + Collections.emptyMap()); + MapStateDescriptor<Integer, String> mapStateDesc = + new MapStateDescriptor<>("name", Integer.class, String.class); + ListStateDescriptor<String> listStateDesc = + new ListStateDescriptor<>("name", String.class); + context.getMapState(mapStateDesc); --- End diff -- Actually, the test is quite tricky here (internally, `getListState()` will fetch a `null` value instead of the `MapState` created before). It will not simulate the real runtime behavior, which erasures the return type for `DefaultKeyedStateStore.getPartitionedState()`, since the test mocks a `KeyedStateBackend`. The type will be checked in advance and that's why I need to catch-and-throw the `ClassCastException` in `getPartitionedState()`. However, I cannot find a better place for this test. Do you have some suggestions?
---