Repository: flink Updated Branches: refs/heads/master 7173774d0 -> 947c44e86
[FLINK-6565] Fail memory-backed state restores with meaningful message if previous serializer is unavailable This closes #3882. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c594af09 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c594af09 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c594af09 Branch: refs/heads/master Commit: c594af09767e2ef1e74dd8db187985460761b724 Parents: 7173774 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Authored: Fri May 12 19:11:25 2017 +0800 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Sat May 13 14:37:49 2017 +0800 ---------------------------------------------------------------------- .../state/DefaultOperatorStateBackend.java | 17 +++ ...ckendStateMetaInfoSnapshotReaderWriters.java | 8 ++ ...ckendStateMetaInfoSnapshotReaderWriters.java | 6 + .../state/heap/HeapKeyedStateBackend.java | 17 +++ .../runtime/state/MemoryStateBackendTest.java | 135 +++++++++++++++++++ .../runtime/state/OperatorStateBackendTest.java | 70 +++++++++- .../runtime/state/StateBackendTestBase.java | 2 +- 7 files changed, 251 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java index ab0c1f0..1d3af72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; @@ -293,6 +294,22 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend { // Recreate all PartitionableListStates from the meta info for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo : restoredMetaInfoSnapshots) { + + if (restoredMetaInfo.getPartitionStateSerializer() == null || + restoredMetaInfo.getPartitionStateSerializer() + instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) { + + // must fail now if the previous serializer cannot be restored because there is no serializer + // capable of reading previous state + // TODO when eager state registration is in place, we can try to get a convert deserializer + // TODO from the newly registered serializer instead of simply failing here + + throw new IOException("Unable to restore operator state [" + restoredMetaInfo.getName() + "]." + + " The previous serializer of the operator state must be present; the serializer could" + + " have been removed from the classpath, or its implementation have changed and could" + + " not be loaded. This is a temporary restriction that will be fixed in future versions."); + } + PartitionableListState<?> listState = registeredStates.get(restoredMetaInfo.getName()); if (null == listState) { http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java index 83aa335..ac81e86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java @@ -28,6 +28,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -37,6 +39,8 @@ import java.io.IOException; */ public class KeyedBackendStateMetaInfoSnapshotReaderWriters { + private static final Logger LOG = LoggerFactory.getLogger(KeyedBackendStateMetaInfoSnapshotReaderWriters.class); + // ------------------------------------------------------------------------------- // Writers // - v1: Flink 1.2.x @@ -230,6 +234,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters { namespaceSerializerProxy.read(inViewWrapper); metaInfo.setNamespaceSerializer(namespaceSerializerProxy.getTypeSerializer()); } catch (IOException e) { + LOG.warn("Deserialization of previous namespace serializer errored; setting serializer to null. ", e); + metaInfo.setNamespaceSerializer(null); } @@ -241,6 +247,8 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters { stateSerializerProxy.read(inViewWrapper); metaInfo.setStateSerializer(stateSerializerProxy.getTypeSerializer()); } catch (IOException e) { + LOG.warn("Deserialization of previous state serializer errored; setting serializer to null. ", e); + metaInfo.setStateSerializer(null); } http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java index 9ab106b..4f151c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java @@ -30,6 +30,8 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; @@ -39,6 +41,8 @@ import java.io.IOException; */ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { + private static final Logger LOG = LoggerFactory.getLogger(OperatorBackendStateMetaInfoSnapshotReaderWriters.class); + // ------------------------------------------------------------------------------- // Writers // - v1: Flink 1.2.x @@ -219,6 +223,8 @@ public class OperatorBackendStateMetaInfoSnapshotReaderWriters { partitionStateSerializerProxy.read(inViewWrapper); stateMetaInfo.setPartitionStateSerializer(partitionStateSerializerProxy.getTypeSerializer()); } catch (IOException e) { + LOG.warn("Deserialization of previous serializer errored; setting serializer to null. ", e); + stateMetaInfo.setPartitionStateSerializer(null); } http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 866ed28..bc314df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -389,6 +390,22 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) { + if (restoredMetaInfo.getStateSerializer() == null || + restoredMetaInfo.getStateSerializer() + instanceof TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer) { + + // must fail now if the previous serializer cannot be restored because there is no serializer + // capable of reading previous state + // TODO when eager state registration is in place, we can try to get a convert deserializer + // TODO from the newly registered serializer instead of simply failing here + + throw new IOException("Unable to restore keyed state [" + restoredMetaInfo.getName() + "]." + + " For memory-backed keyed state, the previous serializer of the keyed state must be" + + " present; the serializer could have been removed from the classpath, or its implementation" + + " have changed and could not be loaded. This is a temporary restriction that will be fixed" + + " in future versions."); + } + StateTable<K, ?, ?> stateTable = stateTables.get(restoredMetaInfo.getName()); //important: only create a new table we did not already create it previously http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java index 48d56e2..fee97f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java @@ -20,27 +20,48 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.util.FutureUtil; +import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; +import java.util.concurrent.RunnableFuture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}. */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({KeyedBackendStateMetaInfoSnapshotReaderWriters.class, OperatorBackendStateMetaInfoSnapshotReaderWriters.class}) public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBackend> { @Override @@ -198,6 +219,120 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack } } + /** + * Verifies that the operator state backend fails with appropriate error and message if + * previous serializer can not be restored. + */ + @Test + public void testOperatorStateRestoreFailsIfSerializerDeserializationFails() throws Exception { + AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); + + Environment env = mock(Environment.class); + when(env.getExecutionConfig()).thenReturn(new ExecutionConfig()); + when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader()); + + OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env, "test-op-name"); + + // write some state + ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); + ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); + ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>()); + ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1); + ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2); + ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3); + + listState1.add(42); + listState1.add(4711); + + listState2.add(7); + listState2.add(13); + listState2.add(23); + + listState3.add(17); + listState3.add(18); + listState3.add(19); + listState3.add(20); + + CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator"); + RunnableFuture<OperatorStateHandle> runnableFuture = + operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()); + OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture); + + try { + + operatorStateBackend.close(); + operatorStateBackend.dispose(); + + operatorStateBackend = abstractStateBackend.createOperatorStateBackend( + env, + "testOperator"); + + // mock failure when deserializing serializer + TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); + PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + + operatorStateBackend.restore(Collections.singletonList(stateHandle)); + + fail("The operator state restore should have failed if the previous state serializer could not be loaded."); + } catch (IOException expected) { + Assert.assertTrue(expected.getMessage().contains("Unable to restore operator state")); + } finally { + stateHandle.discardState(); + } + } + + /** + * Verifies that memory-backed keyed state backend fails with appropriate error and message if + * previous serializer can not be restored. + */ + @Test + public void testKeyedStateRestoreFailsIfSerializerDeserializationFails() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + HeapKeyedStateBackend<Integer> heapBackend = (HeapKeyedStateBackend<Integer>) backend; + + assertEquals(0, heapBackend.numStateEntries()); + + ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // write some state + backend.setCurrentKey(0); + state.update("hello"); + state.update("ciao"); + + KeyedStateHandle snapshot = runSnapshot(((HeapKeyedStateBackend<Integer>) backend).snapshot( + 682375462378L, + 2, + streamFactory, + CheckpointOptions.forFullCheckpoint())); + + backend.dispose(); + + // ========== restore snapshot ========== + + Environment env = mock(Environment.class); + when(env.getExecutionConfig()).thenReturn(new ExecutionConfig()); + when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader()); + + // mock failure when deserializing serializer + TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); + PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + + try { + restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); + + fail("The keyed state restore should have failed if the previous state serializer could not be loaded."); + } catch (IOException expected) { + Assert.assertTrue(expected.getMessage().contains("Unable to restore keyed state")); + } + } + @Ignore @Test public void testConcurrentMapIfQueryable() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 85b9eaf..af5f0b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -22,7 +22,9 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; @@ -32,6 +34,10 @@ import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.util.FutureUtil; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import java.io.File; import java.io.IOException; @@ -52,9 +58,13 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +@RunWith(PowerMockRunner.class) +@PrepareForTest(OperatorBackendStateMetaInfoSnapshotReaderWriters.class) public class OperatorStateBackendTest { private final ClassLoader classLoader = getClass().getClassLoader(); @@ -290,7 +300,7 @@ public class OperatorStateBackendTest { @Test public void testSnapshotRestoreAsync() throws Exception { - DefaultOperatorStateBackend operatorStateBackend = + OperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackend(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true); ListStateDescriptor<MutableType> stateDescriptor1 = @@ -362,8 +372,7 @@ public class OperatorStateBackendTest { AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); - //TODO this is temporarily casted to test already functionality that we do not yet expose through public API - operatorStateBackend = (DefaultOperatorStateBackend) abstractStateBackend.createOperatorStateBackend( + operatorStateBackend = abstractStateBackend.createOperatorStateBackend( createMockEnvironment(), "testOperator"); @@ -494,6 +503,61 @@ public class OperatorStateBackendTest { } } + @Test + public void testRestoreFailsIfSerializerDeserializationFails() throws Exception { + AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); + + OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "test-op-name"); + + // write some state + ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); + ListStateDescriptor<Serializable> stateDescriptor2 = new ListStateDescriptor<>("test2", new JavaSerializer<>()); + ListStateDescriptor<Serializable> stateDescriptor3 = new ListStateDescriptor<>("test3", new JavaSerializer<>()); + ListState<Serializable> listState1 = operatorStateBackend.getListState(stateDescriptor1); + ListState<Serializable> listState2 = operatorStateBackend.getListState(stateDescriptor2); + ListState<Serializable> listState3 = operatorStateBackend.getUnionListState(stateDescriptor3); + + listState1.add(42); + listState1.add(4711); + + listState2.add(7); + listState2.add(13); + listState2.add(23); + + listState3.add(17); + listState3.add(18); + listState3.add(19); + listState3.add(20); + + CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator"); + RunnableFuture<OperatorStateHandle> runnableFuture = + operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forFullCheckpoint()); + OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture); + + try { + + operatorStateBackend.close(); + operatorStateBackend.dispose(); + + operatorStateBackend = abstractStateBackend.createOperatorStateBackend( + createMockEnvironment(), + "testOperator"); + + // mock failure when deserializing serializer + TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class); + doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); + PowerMockito.whenNew(TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); + + operatorStateBackend.restore(Collections.singletonList(stateHandle)); + + fail("The operator state restore should have failed if the previous state serializer could not be loaded."); + } catch (IOException expected) { + Assert.assertTrue(expected.getMessage().contains("Unable to restore operator state")); + } finally { + stateHandle.discardState(); + } + } + static final class MutableType implements Serializable { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/c594af09/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 96025fe..658ccde 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -2508,7 +2508,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } } - private KeyedStateHandle runSnapshot(RunnableFuture<KeyedStateHandle> snapshotRunnableFuture) throws Exception { + protected KeyedStateHandle runSnapshot(RunnableFuture<KeyedStateHandle> snapshotRunnableFuture) throws Exception { if(!snapshotRunnableFuture.isDone()) { Thread runner = new Thread(snapshotRunnableFuture); runner.start();