http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java new file mode 100644 index 0000000..2a6975a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils; +import org.junit.Test; + +import java.util.Map; +import java.util.Random; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.spy; + +public class IncrementalKeyedStateHandleTest { + + /** + * This test checks, that for an unregistered {@link IncrementalKeyedStateHandle} all state + * (including shared) is discarded. + */ + @Test + public void testUnregisteredDiscarding() throws Exception { + IncrementalKeyedStateHandle stateHandle = create(new Random(42)); + + stateHandle.discardState(); + + for (StreamStateHandle handle : stateHandle.getPrivateState().values()) { + verify(handle).discardState(); + } + + for (StreamStateHandle handle : stateHandle.getSharedState().values()) { + verify(handle).discardState(); + } + + verify(stateHandle.getMetaStateHandle()).discardState(); + } + + /** + * This test checks, that for a registered {@link IncrementalKeyedStateHandle} discards respect + * all shared state and only discard it one all references are released. + */ + @Test + public void testSharedStateDeRegistration() throws Exception { + + Random rnd = new Random(42); + + SharedStateRegistry registry = spy(new SharedStateRegistry()); + + // Create two state handles with overlapping shared state + IncrementalKeyedStateHandle stateHandle1 = create(new Random(42)); + IncrementalKeyedStateHandle stateHandle2 = create(new Random(42)); + + // Both handles should not be registered and not discarded by now. + for (Map.Entry<StateHandleID, StreamStateHandle> entry : + stateHandle1.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey()); + + verify(registry, times(0)).unregisterReference(registryKey); + verify(entry.getValue(), times(0)).discardState(); + } + + for (Map.Entry<StateHandleID, StreamStateHandle> entry : + stateHandle2.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey()); + + verify(registry, times(0)).unregisterReference(registryKey); + verify(entry.getValue(), times(0)).discardState(); + } + + // Now we register both ... + stateHandle1.registerSharedStates(registry); + stateHandle2.registerSharedStates(registry); + + for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry : + stateHandle1.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey()); + + verify(registry).registerReference( + registryKey, + stateHandleEntry.getValue()); + } + + for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry : + stateHandle2.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey()); + + verify(registry).registerReference( + registryKey, + stateHandleEntry.getValue()); + } + + // We discard the first + stateHandle1.discardState(); + + // Should be unregistered, non-shared discarded, shared not discarded + for (Map.Entry<StateHandleID, StreamStateHandle> entry : + stateHandle1.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey()); + + verify(registry, times(1)).unregisterReference(registryKey); + verify(entry.getValue(), times(0)).discardState(); + } + + for (StreamStateHandle handle : + stateHandle2.getSharedState().values()) { + + verify(handle, times(0)).discardState(); + } + + for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry : + stateHandle1.getPrivateState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(handleEntry.getKey()); + + verify(registry, times(0)).unregisterReference(registryKey); + verify(handleEntry.getValue(), times(1)).discardState(); + } + + for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry : + stateHandle2.getPrivateState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(handleEntry.getKey()); + + verify(registry, times(0)).unregisterReference(registryKey); + verify(handleEntry.getValue(), times(0)).discardState(); + } + + verify(stateHandle1.getMetaStateHandle(), times(1)).discardState(); + verify(stateHandle2.getMetaStateHandle(), times(0)).discardState(); + + // We discard the second + stateHandle2.discardState(); + + + // Now everything should be unregistered and discarded + for (Map.Entry<StateHandleID, StreamStateHandle> entry : + stateHandle1.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey()); + + verify(registry, times(2)).unregisterReference(registryKey); + verify(entry.getValue()).discardState(); + } + + for (Map.Entry<StateHandleID, StreamStateHandle> entry : + stateHandle2.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey()); + + verify(registry, times(2)).unregisterReference(registryKey); + verify(entry.getValue()).discardState(); + } + + verify(stateHandle1.getMetaStateHandle(), times(1)).discardState(); + verify(stateHandle2.getMetaStateHandle(), times(1)).discardState(); + } + + private static IncrementalKeyedStateHandle create(Random rnd) { + return new IncrementalKeyedStateHandle( + "test", + KeyGroupRange.of(0, 0), + 1L, + placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)), + placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)), + spy(CheckpointTestUtils.createDummyStreamStateHandle(rnd))); + } + + private static Map<StateHandleID, StreamStateHandle> placeSpies( + Map<StateHandleID, StreamStateHandle> map) { + + for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) { + entry.setValue(spy(entry.getValue())); + } + return map; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java index 03e2a13..4104595 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java @@ -40,14 +40,14 @@ public class SharedStateRegistryTest { // register one state TestSharedState firstState = new TestSharedState("first"); - SharedStateRegistry.Result result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), firstState); + SharedStateRegistry.Result result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), firstState); assertEquals(1, result.getReferenceCount()); assertTrue(firstState == result.getReference()); assertFalse(firstState.isDiscarded()); // register another state TestSharedState secondState = new TestSharedState("second"); - result = sharedStateRegistry.registerNewReference(secondState.getRegistrationKey(), secondState); + result = sharedStateRegistry.registerReference(secondState.getRegistrationKey(), secondState); assertEquals(1, result.getReferenceCount()); assertTrue(secondState == result.getReference()); assertFalse(firstState.isDiscarded()); @@ -55,7 +55,7 @@ public class SharedStateRegistryTest { // attempt to register state under an existing key TestSharedState firstStatePrime = new TestSharedState(firstState.getRegistrationKey().getKeyString()); - result = sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), firstStatePrime); + result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), firstStatePrime); assertEquals(2, result.getReferenceCount()); assertFalse(firstStatePrime == result.getReference()); assertTrue(firstState == result.getReference()); @@ -63,19 +63,19 @@ public class SharedStateRegistryTest { assertFalse(firstState.isDiscarded()); // reference the first state again - result = sharedStateRegistry.obtainReference(firstState.getRegistrationKey()); + result = sharedStateRegistry.registerReference(firstState.getRegistrationKey(), firstState); assertEquals(3, result.getReferenceCount()); assertTrue(firstState == result.getReference()); assertFalse(firstState.isDiscarded()); // unregister the second state - result = sharedStateRegistry.releaseReference(secondState.getRegistrationKey()); + result = sharedStateRegistry.unregisterReference(secondState.getRegistrationKey()); assertEquals(0, result.getReferenceCount()); assertTrue(result.getReference() == null); assertTrue(secondState.isDiscarded()); // unregister the first state - result = sharedStateRegistry.releaseReference(firstState.getRegistrationKey()); + result = sharedStateRegistry.unregisterReference(firstState.getRegistrationKey()); assertEquals(2, result.getReferenceCount()); assertTrue(firstState == result.getReference()); assertFalse(firstState.isDiscarded()); @@ -87,7 +87,7 @@ public class SharedStateRegistryTest { @Test(expected = IllegalStateException.class) public void testUnregisterWithUnexistedKey() { SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - sharedStateRegistry.releaseReference(new SharedStateRegistryKey("non-existent")); + sharedStateRegistry.unregisterReference(new SharedStateRegistryKey("non-existent")); } private static class TestSharedState implements StreamStateHandle { http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index b1927f1..8d4a38e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -539,7 +539,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten snapshot2.registerSharedStates(sharedStateRegistry); - snapshot.unregisterSharedStates(sharedStateRegistry); snapshot.discardState(); backend.dispose(); @@ -631,7 +630,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten snapshot2.registerSharedStates(sharedStateRegistry); - snapshot.unregisterSharedStates(sharedStateRegistry); snapshot.discardState(); backend.dispose(); http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java index 11a03cc..2251e46 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java @@ -52,10 +52,12 @@ public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckp @Override public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { - checkpoints.addLast(checkpoint); checkpoint.registerSharedStates(sharedStateRegistry); + checkpoints.addLast(checkpoint); + + if (checkpoints.size() > 1) { CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); checkpointToSubsume.discardOnSubsume(sharedStateRegistry); @@ -76,7 +78,6 @@ public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckp suspended.clear(); for (CompletedCheckpoint checkpoint : checkpoints) { - sharedStateRegistry.unregisterAll(checkpoint.getOperatorStates().values()); suspended.add(checkpoint); } http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index fea2b79..6ad7708 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; @@ -147,8 +148,14 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } case ROCKSDB_INCREMENTAL: { String rocksDb = tempFolder.newFolder().getAbsolutePath(); + String backups = tempFolder.newFolder().getAbsolutePath(); + // we use the fs backend with small threshold here to test the behaviour with file + // references, not self contained byte handles RocksDBStateBackend rdb = - new RocksDBStateBackend(new MemoryStateBackend(MAX_MEM_STATE_SIZE), true); + new RocksDBStateBackend( + new FsStateBackend( + new Path("file://" + backups).toUri(), 16), + true); rdb.setDbStoragePath(rocksDb); this.stateBackend = rdb; break; http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 6c70b87..f9af603 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -20,9 +20,20 @@ package org.apache.flink.test.recovery; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.PoisonPill; import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -35,6 +46,12 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; @@ -43,18 +60,20 @@ import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; import org.apache.flink.runtime.testutils.JobManagerProcess; import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; -import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.testutils.junit.RetryOnFailure; import org.apache.flink.testutils.junit.RetryRule; +import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; @@ -127,7 +146,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { private static final int Parallelism = 8; - private static CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(2); + private static CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(4); private static AtomicLongArray RecoveredStates = new AtomicLongArray(Parallelism); @@ -137,182 +156,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { private static long LastElement = -1; - /** - * Simple checkpointed streaming sum. - * - * <p>The sources (Parallelism) count until sequenceEnd. The sink (1) sums up all counts and - * returns it to the main thread via a static variable. We wait until some checkpoints are - * completed and sanity check that the sources recover with an updated state to make sure that - * this test actually tests something. - */ - @Test - @RetryOnFailure(times=1) - public void testCheckpointedStreamingSumProgram() throws Exception { - // Config - final int checkpointingInterval = 200; - final int sequenceEnd = 5000; - final long expectedSum = Parallelism * sequenceEnd * (sequenceEnd + 1) / 2; - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); - env.setParallelism(Parallelism); - env.enableCheckpointing(checkpointingInterval); - - env - .addSource(new CheckpointedSequenceSource(sequenceEnd)) - .addSink(new CountingSink()) - .setParallelism(1); - - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - - Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(ZooKeeper - .getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString()); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, Parallelism); - - ActorSystem testSystem = null; - final JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2]; - LeaderRetrievalService leaderRetrievalService = null; - ActorSystem taskManagerSystem = null; - final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, - TestingUtils.defaultExecutor(), - HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); - - try { - final Deadline deadline = TestTimeOut.fromNow(); - - // Test actor system - testSystem = AkkaUtils.createActorSystem(new Configuration(), - new Some<>(new Tuple2<String, Object>("localhost", 0))); - - // The job managers - jobManagerProcess[0] = new JobManagerProcess(0, config); - jobManagerProcess[1] = new JobManagerProcess(1, config); - - jobManagerProcess[0].startProcess(); - jobManagerProcess[1].startProcess(); - - // Leader listener - TestingListener leaderListener = new TestingListener(); - leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID); - leaderRetrievalService.start(leaderListener); - - // The task manager - taskManagerSystem = AkkaUtils.createActorSystem( - config, Option.apply(new Tuple2<String, Object>("localhost", 0))); - TaskManager.startTaskManagerComponentsAndActor( - config, - ResourceID.generate(), - taskManagerSystem, - highAvailabilityServices, - "localhost", - Option.<String>empty(), - false, - TaskManager.class); - - { - // Initial submission - leaderListener.waitForNewLeader(deadline.timeLeft().toMillis()); - - String leaderAddress = leaderListener.getAddress(); - UUID leaderId = leaderListener.getLeaderSessionID(); - - // Get the leader ref - ActorRef leaderRef = AkkaUtils.getActorRef( - leaderAddress, testSystem, deadline.timeLeft()); - ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId); - - // Submit the job in detached mode - leader.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED)); - - JobManagerActorTestUtils.waitForJobStatus( - jobGraph.getJobID(), JobStatus.RUNNING, leader, deadline.timeLeft()); - } - - // Who's the boss? - JobManagerProcess leadingJobManagerProcess; - if (jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress())) { - leadingJobManagerProcess = jobManagerProcess[0]; - } - else { - leadingJobManagerProcess = jobManagerProcess[1]; - } - - CompletedCheckpointsLatch.await(); - - // Kill the leading job manager process - leadingJobManagerProcess.destroy(); - - { - // Recovery by the standby JobManager - leaderListener.waitForNewLeader(deadline.timeLeft().toMillis()); - - String leaderAddress = leaderListener.getAddress(); - UUID leaderId = leaderListener.getLeaderSessionID(); - - ActorRef leaderRef = AkkaUtils.getActorRef( - leaderAddress, testSystem, deadline.timeLeft()); - ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId); - - JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, - leader, deadline.timeLeft()); - } - - // Wait to finish - FinalCountLatch.await(); - - assertEquals(expectedSum, (long) FinalCount.get()); - - for (int i = 0; i < Parallelism; i++) { - assertNotEquals(0, RecoveredStates.get(i)); - } - } - catch (Throwable t) { - // Reset all static state for test retries - CompletedCheckpointsLatch = new CountDownLatch(2); - RecoveredStates = new AtomicLongArray(Parallelism); - FinalCountLatch = new CountDownLatch(1); - FinalCount = new AtomicReference<>(); - LastElement = -1; - - // Print early (in some situations the process logs get too big - // for Travis and the root problem is not shown) - t.printStackTrace(); - - // In case of an error, print the job manager process logs. - if (jobManagerProcess[0] != null) { - jobManagerProcess[0].printProcessLog(); - } - - if (jobManagerProcess[1] != null) { - jobManagerProcess[1].printProcessLog(); - } - - throw t; - } - finally { - if (jobManagerProcess[0] != null) { - jobManagerProcess[0].destroy(); - } - - if (jobManagerProcess[1] != null) { - jobManagerProcess[1].destroy(); - } - - if (leaderRetrievalService != null) { - leaderRetrievalService.stop(); - } - - if (taskManagerSystem != null) { - taskManagerSystem.shutdown(); - } - - if (testSystem != null) { - testSystem.shutdown(); - } - - highAvailabilityServices.closeAndCleanupAllData(); - } - } + private static final int retainedCheckpoints = 2; /** * Tests that the JobManager logs failures during recovery properly. @@ -480,13 +324,110 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { } } + @Test + public void testCheckpointedStreamingProgramIncrementalRocksDB() throws Exception { + testCheckpointedStreamingProgram( + new RocksDBStateBackend( + new FsStateBackend(FileStateBackendBasePath.getAbsoluteFile().toURI(), 16), + true)); + } + + private void testCheckpointedStreamingProgram(AbstractStateBackend stateBackend) throws Exception { + + // Config + final int checkpointingInterval = 100; + final int sequenceEnd = 5000; + final long expectedSum = Parallelism * sequenceEnd * (sequenceEnd + 1) / 2; + + final ActorSystem system = ActorSystem.create("Test", AkkaUtils.getDefaultAkkaConfig()); + final TestingServer testingServer = new TestingServer(); + final TemporaryFolder temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + + LocalFlinkMiniCluster miniCluster = null; + + final int numJMs = 2; + final int numTMs = 4; + final int numSlots = 8; + + try { + Configuration config = new Configuration(); + config.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints); + config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots); + + + String tmpFolderString = temporaryFolder.newFolder().toString(); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, tmpFolderString); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + + miniCluster = new LocalFlinkMiniCluster(config, true); + + miniCluster.start(); + + ActorGateway jmGateway = miniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(Parallelism); + env.enableCheckpointing(checkpointingInterval); + + //TODO parameterize + env.setStateBackend(stateBackend); + env + .addSource(new CheckpointedSequenceSource(sequenceEnd, 1)) + .keyBy(new KeySelector<Long, Object>() { + + private static final long serialVersionUID = -8572892067702489025L; + + @Override + public Object getKey(Long value) throws Exception { + return value; + } + }) + .flatMap(new StatefulFlatMap()).setParallelism(1) + .addSink(new CountingSink()) + .setParallelism(1); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + miniCluster.submitJobDetached(jobGraph); + + CompletedCheckpointsLatch.await(); + + jmGateway.tell(PoisonPill.getInstance()); + + // Wait to finish + FinalCountLatch.await(); + + assertEquals(expectedSum, (long) FinalCount.get()); + + for (int i = 0; i < Parallelism; i++) { + assertNotEquals(0, RecoveredStates.get(i)); + } + + } finally { + if (miniCluster != null) { + miniCluster.stop(); + miniCluster.awaitTermination(); + } + + system.shutdown(); + system.awaitTermination(); + + testingServer.stop(); + testingServer.close(); + + } + } + // --------------------------------------------------------------------------------------------- /** * A checkpointed source, which emits elements from 0 to a configured number. */ public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long> - implements ListCheckpointed<Long> { + implements ListCheckpointed<Tuple2<Long, Integer>> { private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class); @@ -496,13 +437,22 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { private final long end; - private long current = 0; + private int repeat; + + private long current; private volatile boolean isRunning = true; public CheckpointedSequenceSource(long end) { + this(end, 1); + + } + + public CheckpointedSequenceSource(long end, int repeat) { checkArgument(end >= 0, "Negative final count"); + this.current = 0; this.end = end; + this.repeat = repeat; } @Override @@ -511,8 +461,10 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { synchronized (ctx.getCheckpointLock()) { if (current <= end) { ctx.collect(current++); - } - else { + } else if(repeat > 0) { + --repeat; + current = 0; + } else { ctx.collect(LastElement); return; } @@ -520,32 +472,33 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { // Slow down until some checkpoints are completed if (sync.getCount() != 0) { - Thread.sleep(100); + Thread.sleep(50); } } } @Override - public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception { + public List<Tuple2<Long, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception { LOG.debug("Snapshotting state {} @ ID {}.", current, checkpointId); - return Collections.singletonList(this.current); + return Collections.singletonList(new Tuple2<>(this.current, this.repeat)); } @Override - public void restoreState(List<Long> state) throws Exception { - if (state.isEmpty() || state.size() > 1) { - throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size()); + public void restoreState(List<Tuple2<Long, Integer>> list) throws Exception { + if (list.isEmpty() || list.size() > 1) { + throw new RuntimeException("Test failed due to unexpected recovered state size " + list.size()); } - Long s = state.get(0); - LOG.debug("Restoring state {}", s); + Tuple2<Long, Integer> state = list.get(0); + LOG.debug("Restoring state {}", state); // This is necessary to make sure that something is recovered at all. Otherwise it // might happen that the job is restarted from the beginning. - RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), s); + RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), 1); sync.countDown(); - current = s; + current = state._1; + repeat = state._2; } @Override @@ -571,6 +524,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { @Override public void invoke(Long value) throws Exception { + if (value == LastElement) { numberOfReceivedLastElements++; @@ -611,4 +565,41 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { CompletedCheckpointsLatch.countDown(); } } + + public static class StatefulFlatMap extends RichFlatMapFunction<Long, Long> implements CheckpointedFunction { + + private static final long serialVersionUID = 9031079547885320663L; + + private transient ValueState<Integer> alreadySeen; + + @Override + public void flatMap(Long input, Collector<Long> out) throws Exception { + + Integer seen = this.alreadySeen.value(); + if (seen >= Parallelism || input == -1) { + out.collect(input); + } + this.alreadySeen.update(seen + 1); + } + + @Override + public void open(Configuration config) { + + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + ValueStateDescriptor<Integer> descriptor = + new ValueStateDescriptor<>( + "seenCountState", + TypeInformation.of(new TypeHint<Integer>() {}), + 0); + alreadySeen = context.getKeyedStateStore().getState(descriptor); + } + } }
