[hotfix] [tests] Fix failing tests in AsyncWaitOperatorTest and 
StateBackendTestBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61a45e4b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61a45e4b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61a45e4b

Branch: refs/heads/master
Commit: 61a45e4b2895905efcffd216789bf753fc9f5c56
Parents: 2d34af3
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Sun Jun 4 15:51:12 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jun 13 06:38:17 2017 +0200

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java | 10 +++++-----
 .../streaming/util/AbstractStreamOperatorTestHarness.java |  5 ++++-
 2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/61a45e4b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 758e894..7cbfb15 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1063,7 +1063,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * @throws RocksDBException
                 */
                public void doRestore(Collection<KeyedStateHandle> 
keyedStateHandles)
-                               throws IOException, ClassNotFoundException, 
RocksDBException {
+                               throws IOException, StateMigrationException, 
ClassNotFoundException, RocksDBException {
 
                        rocksDBKeyedStateBackend.createDB();
 
@@ -1089,7 +1089,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * @throws ClassNotFoundException
                 */
                private void restoreKeyGroupsInStateHandle()
-                               throws IOException, RocksDBException, 
ClassNotFoundException {
+                               throws IOException, StateMigrationException, 
RocksDBException, ClassNotFoundException {
                        try {
                                currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
                                
rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
@@ -1111,7 +1111,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * @throws ClassNotFoundException
                 * @throws RocksDBException
                 */
-               private void restoreKVStateMetaData() throws IOException, 
RocksDBException {
+               private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
 
                        KeyedBackendSerializationProxy<K> serializationProxy =
                                        new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
@@ -1128,7 +1128,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                .isRequiresMigration()) {
 
                                // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
-                               throw new RuntimeException("The new key 
serializer is not compatible to read previous keys. " +
+                               throw new StateMigrationException("The new key 
serializer is not compatible to read previous keys. " +
                                        "Aborting now since state migration is 
currently not available");
                        }
 
@@ -1248,7 +1248,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        .isRequiresMigration()) {
 
                                        // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
-                                       throw new RuntimeException("The new key 
serializer is not compatible to read previous keys. " +
+                                       throw new StateMigrationException("The 
new key serializer is not compatible to read previous keys. " +
                                                "Aborting now since state 
migration is currently not available");
                                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/61a45e4b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index fd781f6..47e8726 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -186,7 +186,10 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
                when(mockTask.getEnvironment()).thenReturn(environment);
                when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-               
when(mockTask.getUserCodeClassLoader()).thenReturn(environment.getUserClassLoader());
+
+               ClassLoader cl = environment.getUserClassLoader();
+               when(mockTask.getUserCodeClassLoader()).thenReturn(cl);
+
                
when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
                
when(mockTask.getStreamStatusMaintainer()).thenReturn(mockStreamStatusMaintainer);
 

Reply via email to