[hotfix] [tests] Fix failing tests in AsyncWaitOperatorTest and StateBackendTestBase
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/61a45e4b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/61a45e4b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/61a45e4b Branch: refs/heads/master Commit: 61a45e4b2895905efcffd216789bf753fc9f5c56 Parents: 2d34af3 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Sun Jun 4 15:51:12 2017 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue Jun 13 06:38:17 2017 +0200 ---------------------------------------------------------------------- .../contrib/streaming/state/RocksDBKeyedStateBackend.java | 10 +++++----- .../streaming/util/AbstractStreamOperatorTestHarness.java | 5 ++++- 2 files changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/61a45e4b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 758e894..7cbfb15 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -1063,7 +1063,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * @throws RocksDBException */ public void doRestore(Collection<KeyedStateHandle> keyedStateHandles) - throws IOException, ClassNotFoundException, RocksDBException { + throws IOException, StateMigrationException, ClassNotFoundException, RocksDBException { rocksDBKeyedStateBackend.createDB(); @@ -1089,7 +1089,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * @throws ClassNotFoundException */ private void restoreKeyGroupsInStateHandle() - throws IOException, RocksDBException, ClassNotFoundException { + throws IOException, StateMigrationException, RocksDBException, ClassNotFoundException { try { currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream); @@ -1111,7 +1111,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { * @throws ClassNotFoundException * @throws RocksDBException */ - private void restoreKVStateMetaData() throws IOException, RocksDBException { + private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { KeyedBackendSerializationProxy<K> serializationProxy = new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); @@ -1128,7 +1128,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration - throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + "Aborting now since state migration is currently not available"); } @@ -1248,7 +1248,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { .isRequiresMigration()) { // TODO replace with state migration; note that key hash codes need to remain the same after migration - throw new RuntimeException("The new key serializer is not compatible to read previous keys. " + + throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + "Aborting now since state migration is currently not available"); } http://git-wip-us.apache.org/repos/asf/flink/blob/61a45e4b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index fd781f6..47e8726 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -186,7 +186,10 @@ public class AbstractStreamOperatorTestHarness<OUT> { when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig); when(mockTask.getEnvironment()).thenReturn(environment); when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - when(mockTask.getUserCodeClassLoader()).thenReturn(environment.getUserClassLoader()); + + ClassLoader cl = environment.getUserClassLoader(); + when(mockTask.getUserCodeClassLoader()).thenReturn(cl); + when(mockTask.getCancelables()).thenReturn(this.closableRegistry); when(mockTask.getStreamStatusMaintainer()).thenReturn(mockStreamStatusMaintainer);
