Repository: flink
Updated Branches:
  refs/heads/master b8f8524af -> 0162543ac


http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
new file mode 100644
index 0000000..2a6975a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Random;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+public class IncrementalKeyedStateHandleTest {
+
+       /**
+        * This test checks, that for an unregistered {@link 
IncrementalKeyedStateHandle} all state
+        * (including shared) is discarded.
+        */
+       @Test
+       public void testUnregisteredDiscarding() throws Exception {
+               IncrementalKeyedStateHandle stateHandle = create(new 
Random(42));
+
+               stateHandle.discardState();
+
+               for (StreamStateHandle handle : 
stateHandle.getPrivateState().values()) {
+                       verify(handle).discardState();
+               }
+
+               for (StreamStateHandle handle : 
stateHandle.getSharedState().values()) {
+                       verify(handle).discardState();
+               }
+
+               verify(stateHandle.getMetaStateHandle()).discardState();
+       }
+
+       /**
+        * This test checks, that for a registered {@link 
IncrementalKeyedStateHandle} discards respect
+        * all shared state and only discard it one all references are released.
+        */
+       @Test
+       public void testSharedStateDeRegistration() throws Exception {
+
+               Random rnd = new Random(42);
+
+               SharedStateRegistry registry = spy(new SharedStateRegistry());
+
+               // Create two state handles with overlapping shared state
+               IncrementalKeyedStateHandle stateHandle1 = create(new 
Random(42));
+               IncrementalKeyedStateHandle stateHandle2 = create(new 
Random(42));
+
+               // Both handles should not be registered and not discarded by 
now.
+               for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+                       stateHandle1.getSharedState().entrySet()) {
+
+                       SharedStateRegistryKey registryKey =
+                               
stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+                       verify(registry, 
times(0)).unregisterReference(registryKey);
+                       verify(entry.getValue(), times(0)).discardState();
+               }
+
+               for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+                       stateHandle2.getSharedState().entrySet()) {
+
+                       SharedStateRegistryKey registryKey =
+                               
stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+                       verify(registry, 
times(0)).unregisterReference(registryKey);
+                       verify(entry.getValue(), times(0)).discardState();
+               }
+
+               // Now we register both ...
+               stateHandle1.registerSharedStates(registry);
+               stateHandle2.registerSharedStates(registry);
+
+               for (Map.Entry<StateHandleID, StreamStateHandle> 
stateHandleEntry :
+                       stateHandle1.getSharedState().entrySet()) {
+
+                       SharedStateRegistryKey registryKey =
+                               
stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey());
+
+                       verify(registry).registerReference(
+                               registryKey,
+                               stateHandleEntry.getValue());
+               }
+
+               for (Map.Entry<StateHandleID, StreamStateHandle> 
stateHandleEntry :
+                       stateHandle2.getSharedState().entrySet()) {
+
+                       SharedStateRegistryKey registryKey =
+                               
stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey());
+
+                       verify(registry).registerReference(
+                               registryKey,
+                               stateHandleEntry.getValue());
+               }
+
+               // We discard the first
+               stateHandle1.discardState();
+
+               // Should be unregistered, non-shared discarded, shared not 
discarded
+               for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+                       stateHandle1.getSharedState().entrySet()) {
+
+                       SharedStateRegistryKey registryKey =
+                               
stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+                       verify(registry, 
times(1)).unregisterReference(registryKey);
+                       verify(entry.getValue(), times(0)).discardState();
+               }
+
+               for (StreamStateHandle handle :
+                       stateHandle2.getSharedState().values()) {
+
+                       verify(handle, times(0)).discardState();
+               }
+
+               for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry :
+                       stateHandle1.getPrivateState().entrySet()) {
+
+                       SharedStateRegistryKey registryKey =
+                               
stateHandle1.createSharedStateRegistryKeyFromFileName(handleEntry.getKey());
+
+                       verify(registry, 
times(0)).unregisterReference(registryKey);
+                       verify(handleEntry.getValue(), times(1)).discardState();
+               }
+
+               for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry :
+                       stateHandle2.getPrivateState().entrySet()) {
+
+                       SharedStateRegistryKey registryKey =
+                               
stateHandle1.createSharedStateRegistryKeyFromFileName(handleEntry.getKey());
+
+                       verify(registry, 
times(0)).unregisterReference(registryKey);
+                       verify(handleEntry.getValue(), times(0)).discardState();
+               }
+
+               verify(stateHandle1.getMetaStateHandle(), 
times(1)).discardState();
+               verify(stateHandle2.getMetaStateHandle(), 
times(0)).discardState();
+
+               // We discard the second
+               stateHandle2.discardState();
+
+
+               // Now everything should be unregistered and discarded
+               for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+                       stateHandle1.getSharedState().entrySet()) {
+
+                       SharedStateRegistryKey registryKey =
+                               
stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+                       verify(registry, 
times(2)).unregisterReference(registryKey);
+                       verify(entry.getValue()).discardState();
+               }
+
+               for (Map.Entry<StateHandleID, StreamStateHandle> entry :
+                       stateHandle2.getSharedState().entrySet()) {
+
+                       SharedStateRegistryKey registryKey =
+                               
stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+                       verify(registry, 
times(2)).unregisterReference(registryKey);
+                       verify(entry.getValue()).discardState();
+               }
+
+               verify(stateHandle1.getMetaStateHandle(), 
times(1)).discardState();
+               verify(stateHandle2.getMetaStateHandle(), 
times(1)).discardState();
+       }
+
+       private static IncrementalKeyedStateHandle create(Random rnd) {
+               return new IncrementalKeyedStateHandle(
+                       "test",
+                       KeyGroupRange.of(0, 0),
+                       1L,
+                       
placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
+                       
placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
+                       
spy(CheckpointTestUtils.createDummyStreamStateHandle(rnd)));
+       }
+
+       private static Map<StateHandleID, StreamStateHandle> placeSpies(
+               Map<StateHandleID, StreamStateHandle> map) {
+
+               for (Map.Entry<StateHandleID, StreamStateHandle> entry : 
map.entrySet()) {
+                       entry.setValue(spy(entry.getValue()));
+               }
+               return map;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 03e2a13..4104595 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -40,14 +40,14 @@ public class SharedStateRegistryTest {
 
                // register one state
                TestSharedState firstState = new TestSharedState("first");
-               SharedStateRegistry.Result result = 
sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), 
firstState);
+               SharedStateRegistry.Result result = 
sharedStateRegistry.registerReference(firstState.getRegistrationKey(), 
firstState);
                assertEquals(1, result.getReferenceCount());
                assertTrue(firstState == result.getReference());
                assertFalse(firstState.isDiscarded());
 
                // register another state
                TestSharedState secondState = new TestSharedState("second");
-               result = 
sharedStateRegistry.registerNewReference(secondState.getRegistrationKey(), 
secondState);
+               result = 
sharedStateRegistry.registerReference(secondState.getRegistrationKey(), 
secondState);
                assertEquals(1, result.getReferenceCount());
                assertTrue(secondState == result.getReference());
                assertFalse(firstState.isDiscarded());
@@ -55,7 +55,7 @@ public class SharedStateRegistryTest {
 
                // attempt to register state under an existing key
                TestSharedState firstStatePrime = new 
TestSharedState(firstState.getRegistrationKey().getKeyString());
-               result = 
sharedStateRegistry.registerNewReference(firstState.getRegistrationKey(), 
firstStatePrime);
+               result = 
sharedStateRegistry.registerReference(firstState.getRegistrationKey(), 
firstStatePrime);
                assertEquals(2, result.getReferenceCount());
                assertFalse(firstStatePrime == result.getReference());
                assertTrue(firstState == result.getReference());
@@ -63,19 +63,19 @@ public class SharedStateRegistryTest {
                assertFalse(firstState.isDiscarded());
 
                // reference the first state again
-               result = 
sharedStateRegistry.obtainReference(firstState.getRegistrationKey());
+               result = 
sharedStateRegistry.registerReference(firstState.getRegistrationKey(), 
firstState);
                assertEquals(3, result.getReferenceCount());
                assertTrue(firstState == result.getReference());
                assertFalse(firstState.isDiscarded());
 
                // unregister the second state
-               result = 
sharedStateRegistry.releaseReference(secondState.getRegistrationKey());
+               result = 
sharedStateRegistry.unregisterReference(secondState.getRegistrationKey());
                assertEquals(0, result.getReferenceCount());
                assertTrue(result.getReference() == null);
                assertTrue(secondState.isDiscarded());
 
                // unregister the first state
-               result = 
sharedStateRegistry.releaseReference(firstState.getRegistrationKey());
+               result = 
sharedStateRegistry.unregisterReference(firstState.getRegistrationKey());
                assertEquals(2, result.getReferenceCount());
                assertTrue(firstState == result.getReference());
                assertFalse(firstState.isDiscarded());
@@ -87,7 +87,7 @@ public class SharedStateRegistryTest {
        @Test(expected = IllegalStateException.class)
        public void testUnregisterWithUnexistedKey() {
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
-               sharedStateRegistry.releaseReference(new 
SharedStateRegistryKey("non-existent"));
+               sharedStateRegistry.unregisterReference(new 
SharedStateRegistryKey("non-existent"));
        }
 
        private static class TestSharedState implements StreamStateHandle {

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index b1927f1..8d4a38e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -539,7 +539,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                snapshot2.registerSharedStates(sharedStateRegistry);
 
-               snapshot.unregisterSharedStates(sharedStateRegistry);
                snapshot.discardState();
 
                backend.dispose();
@@ -631,7 +630,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                snapshot2.registerSharedStates(sharedStateRegistry);
 
-               snapshot.unregisterSharedStates(sharedStateRegistry);
                snapshot.discardState();
 
                backend.dispose();

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index 11a03cc..2251e46 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -52,10 +52,12 @@ public class RecoverableCompletedCheckpointStore extends 
AbstractCompletedCheckp
 
        @Override
        public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
-               checkpoints.addLast(checkpoint);
 
                checkpoint.registerSharedStates(sharedStateRegistry);
 
+               checkpoints.addLast(checkpoint);
+
+
                if (checkpoints.size() > 1) {
                        CompletedCheckpoint checkpointToSubsume = 
checkpoints.removeFirst();
                        
checkpointToSubsume.discardOnSubsume(sharedStateRegistry);
@@ -76,7 +78,6 @@ public class RecoverableCompletedCheckpointStore extends 
AbstractCompletedCheckp
                        suspended.clear();
 
                        for (CompletedCheckpoint checkpoint : checkpoints) {
-                               
sharedStateRegistry.unregisterAll(checkpoint.getOperatorStates().values());
                                suspended.add(checkpoint);
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index fea2b79..6ad7708 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -147,8 +148,14 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                        }
                        case ROCKSDB_INCREMENTAL: {
                                String rocksDb = 
tempFolder.newFolder().getAbsolutePath();
+                               String backups = 
tempFolder.newFolder().getAbsolutePath();
+                               // we use the fs backend with small threshold 
here to test the behaviour with file
+                               // references, not self contained byte handles
                                RocksDBStateBackend rdb =
-                                       new RocksDBStateBackend(new 
MemoryStateBackend(MAX_MEM_STATE_SIZE), true);
+                                       new RocksDBStateBackend(
+                                               new FsStateBackend(
+                                                       new Path("file://" + 
backups).toUri(), 16),
+                                               true);
                                rdb.setDbStoragePath(rocksDb);
                                this.stateBackend = rdb;
                                break;

http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 6c70b87..f9af603 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -20,9 +20,20 @@ package org.apache.flink.test.recovery;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import org.apache.commons.io.FileUtils;
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -35,6 +46,12 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
@@ -43,18 +60,20 @@ import 
org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
-import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.testutils.junit.RetryOnFailure;
 import org.apache.flink.testutils.junit.RetryRule;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -127,7 +146,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
 
        private static final int Parallelism = 8;
 
-       private static CountDownLatch CompletedCheckpointsLatch = new 
CountDownLatch(2);
+       private static CountDownLatch CompletedCheckpointsLatch = new 
CountDownLatch(4);
 
        private static AtomicLongArray RecoveredStates = new 
AtomicLongArray(Parallelism);
 
@@ -137,182 +156,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
 
        private static long LastElement = -1;
 
-       /**
-        * Simple checkpointed streaming sum.
-        *
-        * <p>The sources (Parallelism) count until sequenceEnd. The sink (1) 
sums up all counts and
-        * returns it to the main thread via a static variable. We wait until 
some checkpoints are
-        * completed and sanity check that the sources recover with an updated 
state to make sure that
-        * this test actually tests something.
-        */
-       @Test
-       @RetryOnFailure(times=1)
-       public void testCheckpointedStreamingSumProgram() throws Exception {
-               // Config
-               final int checkpointingInterval = 200;
-               final int sequenceEnd = 5000;
-               final long expectedSum = Parallelism * sequenceEnd * 
(sequenceEnd + 1) / 2;
-
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
-               env.setParallelism(Parallelism);
-               env.enableCheckpointing(checkpointingInterval);
-
-               env
-                               .addSource(new 
CheckpointedSequenceSource(sequenceEnd))
-                               .addSink(new CountingSink())
-                               .setParallelism(1);
-
-               JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-
-               Configuration config = 
ZooKeeperTestUtils.createZooKeeperHAConfig(ZooKeeper
-                               .getConnectString(), 
FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
-               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
Parallelism);
-
-               ActorSystem testSystem = null;
-               final JobManagerProcess[] jobManagerProcess = new 
JobManagerProcess[2];
-               LeaderRetrievalService leaderRetrievalService = null;
-               ActorSystem taskManagerSystem = null;
-               final HighAvailabilityServices highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-                       config,
-                       TestingUtils.defaultExecutor(),
-                       
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-               try {
-                       final Deadline deadline = TestTimeOut.fromNow();
-
-                       // Test actor system
-                       testSystem = AkkaUtils.createActorSystem(new 
Configuration(),
-                                       new Some<>(new Tuple2<String, 
Object>("localhost", 0)));
-
-                       // The job managers
-                       jobManagerProcess[0] = new JobManagerProcess(0, config);
-                       jobManagerProcess[1] = new JobManagerProcess(1, config);
-
-                       jobManagerProcess[0].startProcess();
-                       jobManagerProcess[1].startProcess();
-
-                       // Leader listener
-                       TestingListener leaderListener = new TestingListener();
-                       leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID);
-                       leaderRetrievalService.start(leaderListener);
-
-                       // The task manager
-                       taskManagerSystem = AkkaUtils.createActorSystem(
-                                       config, Option.apply(new Tuple2<String, 
Object>("localhost", 0)));
-                       TaskManager.startTaskManagerComponentsAndActor(
-                               config,
-                               ResourceID.generate(),
-                               taskManagerSystem,
-                               highAvailabilityServices,
-                               "localhost",
-                               Option.<String>empty(),
-                               false,
-                               TaskManager.class);
-
-                       {
-                               // Initial submission
-                               
leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-                               String leaderAddress = 
leaderListener.getAddress();
-                               UUID leaderId = 
leaderListener.getLeaderSessionID();
-
-                               // Get the leader ref
-                               ActorRef leaderRef = AkkaUtils.getActorRef(
-                                               leaderAddress, testSystem, 
deadline.timeLeft());
-                               ActorGateway leader = new 
AkkaActorGateway(leaderRef, leaderId);
-
-                               // Submit the job in detached mode
-                               leader.tell(new SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED));
-
-                               JobManagerActorTestUtils.waitForJobStatus(
-                                               jobGraph.getJobID(), 
JobStatus.RUNNING, leader, deadline.timeLeft());
-                       }
-
-                       // Who's the boss?
-                       JobManagerProcess leadingJobManagerProcess;
-                       if 
(jobManagerProcess[0].getJobManagerAkkaURL(deadline.timeLeft()).equals(leaderListener.getAddress()))
 {
-                               leadingJobManagerProcess = jobManagerProcess[0];
-                       }
-                       else {
-                               leadingJobManagerProcess = jobManagerProcess[1];
-                       }
-
-                       CompletedCheckpointsLatch.await();
-
-                       // Kill the leading job manager process
-                       leadingJobManagerProcess.destroy();
-
-                       {
-                               // Recovery by the standby JobManager
-                               
leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
-
-                               String leaderAddress = 
leaderListener.getAddress();
-                               UUID leaderId = 
leaderListener.getLeaderSessionID();
-
-                               ActorRef leaderRef = AkkaUtils.getActorRef(
-                                               leaderAddress, testSystem, 
deadline.timeLeft());
-                               ActorGateway leader = new 
AkkaActorGateway(leaderRef, leaderId);
-
-                               
JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), 
JobStatus.RUNNING,
-                                               leader, deadline.timeLeft());
-                       }
-
-                       // Wait to finish
-                       FinalCountLatch.await();
-
-                       assertEquals(expectedSum, (long) FinalCount.get());
-
-                       for (int i = 0; i < Parallelism; i++) {
-                               assertNotEquals(0, RecoveredStates.get(i));
-                       }
-               }
-               catch (Throwable t) {
-                       // Reset all static state for test retries
-                       CompletedCheckpointsLatch = new CountDownLatch(2);
-                       RecoveredStates = new AtomicLongArray(Parallelism);
-                       FinalCountLatch = new CountDownLatch(1);
-                       FinalCount = new AtomicReference<>();
-                       LastElement = -1;
-
-                       // Print early (in some situations the process logs get 
too big
-                       // for Travis and the root problem is not shown)
-                       t.printStackTrace();
-
-                       // In case of an error, print the job manager process 
logs.
-                       if (jobManagerProcess[0] != null) {
-                               jobManagerProcess[0].printProcessLog();
-                       }
-
-                       if (jobManagerProcess[1] != null) {
-                               jobManagerProcess[1].printProcessLog();
-                       }
-
-                       throw t;
-               }
-               finally {
-                       if (jobManagerProcess[0] != null) {
-                               jobManagerProcess[0].destroy();
-                       }
-
-                       if (jobManagerProcess[1] != null) {
-                               jobManagerProcess[1].destroy();
-                       }
-
-                       if (leaderRetrievalService != null) {
-                               leaderRetrievalService.stop();
-                       }
-
-                       if (taskManagerSystem != null) {
-                               taskManagerSystem.shutdown();
-                       }
-
-                       if (testSystem != null) {
-                               testSystem.shutdown();
-                       }
-
-                       highAvailabilityServices.closeAndCleanupAllData();
-               }
-       }
+       private static final int retainedCheckpoints = 2;
 
        /**
         * Tests that the JobManager logs failures during recovery properly.
@@ -480,13 +324,110 @@ public class JobManagerHACheckpointRecoveryITCase 
extends TestLogger {
                }
        }
 
+       @Test
+       public void testCheckpointedStreamingProgramIncrementalRocksDB() throws 
Exception {
+               testCheckpointedStreamingProgram(
+                       new RocksDBStateBackend(
+                               new 
FsStateBackend(FileStateBackendBasePath.getAbsoluteFile().toURI(), 16),
+                               true));
+       }
+
+       private void testCheckpointedStreamingProgram(AbstractStateBackend 
stateBackend) throws Exception {
+
+               // Config
+               final int checkpointingInterval = 100;
+               final int sequenceEnd = 5000;
+               final long expectedSum = Parallelism * sequenceEnd * 
(sequenceEnd + 1) / 2;
+
+               final ActorSystem system = ActorSystem.create("Test", 
AkkaUtils.getDefaultAkkaConfig());
+               final TestingServer testingServer = new TestingServer();
+               final TemporaryFolder temporaryFolder = new TemporaryFolder();
+               temporaryFolder.create();
+
+               LocalFlinkMiniCluster miniCluster = null;
+
+               final int numJMs = 2;
+               final int numTMs = 4;
+               final int numSlots = 8;
+
+               try {
+                       Configuration config = new Configuration();
+                       config.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, 
retainedCheckpoints);
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+
+
+                       String tmpFolderString = 
temporaryFolder.newFolder().toString();
+                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, tmpFolderString);
+                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
testingServer.getConnectString());
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
+
+                       miniCluster = new LocalFlinkMiniCluster(config, true);
+
+                       miniCluster.start();
+
+                       ActorGateway jmGateway = 
miniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+                       env.setParallelism(Parallelism);
+                       env.enableCheckpointing(checkpointingInterval);
+
+                       //TODO parameterize
+                       env.setStateBackend(stateBackend);
+                       env
+                               .addSource(new 
CheckpointedSequenceSource(sequenceEnd, 1))
+                               .keyBy(new KeySelector<Long, Object>() {
+
+                                       private static final long 
serialVersionUID = -8572892067702489025L;
+
+                                       @Override
+                                       public Object getKey(Long value) throws 
Exception {
+                                               return value;
+                                       }
+                               })
+                               .flatMap(new 
StatefulFlatMap()).setParallelism(1)
+                               .addSink(new CountingSink())
+                               .setParallelism(1);
+
+                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+                       miniCluster.submitJobDetached(jobGraph);
+
+                       CompletedCheckpointsLatch.await();
+
+                       jmGateway.tell(PoisonPill.getInstance());
+
+                       // Wait to finish
+                       FinalCountLatch.await();
+
+                       assertEquals(expectedSum, (long) FinalCount.get());
+
+                       for (int i = 0; i < Parallelism; i++) {
+                               assertNotEquals(0, RecoveredStates.get(i));
+                       }
+
+               } finally {
+                       if (miniCluster != null) {
+                               miniCluster.stop();
+                               miniCluster.awaitTermination();
+                       }
+
+                       system.shutdown();
+                       system.awaitTermination();
+
+                       testingServer.stop();
+                       testingServer.close();
+
+               }
+       }
+
        // 
---------------------------------------------------------------------------------------------
 
        /**
         * A checkpointed source, which emits elements from 0 to a configured 
number.
         */
        public static class CheckpointedSequenceSource extends 
RichParallelSourceFunction<Long>
-                       implements ListCheckpointed<Long> {
+                       implements ListCheckpointed<Tuple2<Long, Integer>> {
 
                private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointedSequenceSource.class);
 
@@ -496,13 +437,22 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
 
                private final long end;
 
-               private long current = 0;
+               private int repeat;
+
+               private long current;
 
                private volatile boolean isRunning = true;
 
                public CheckpointedSequenceSource(long end) {
+                       this(end, 1);
+
+               }
+
+               public CheckpointedSequenceSource(long end, int repeat) {
                        checkArgument(end >= 0, "Negative final count");
+                       this.current = 0;
                        this.end = end;
+                       this.repeat = repeat;
                }
 
                @Override
@@ -511,8 +461,10 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                                synchronized (ctx.getCheckpointLock()) {
                                        if (current <= end) {
                                                ctx.collect(current++);
-                                       }
-                                       else {
+                                       } else if(repeat > 0) {
+                                               --repeat;
+                                               current = 0;
+                                       } else {
                                                ctx.collect(LastElement);
                                                return;
                                        }
@@ -520,32 +472,33 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
 
                                // Slow down until some checkpoints are 
completed
                                if (sync.getCount() != 0) {
-                                       Thread.sleep(100);
+                                       Thread.sleep(50);
                                }
                        }
                }
 
                @Override
-               public List<Long> snapshotState(long checkpointId, long 
timestamp) throws Exception {
+               public List<Tuple2<Long, Integer>> snapshotState(long 
checkpointId, long timestamp) throws Exception {
                        LOG.debug("Snapshotting state {} @ ID {}.", current, 
checkpointId);
-                       return Collections.singletonList(this.current);
+                       return Collections.singletonList(new 
Tuple2<>(this.current, this.repeat));
                }
 
                @Override
-               public void restoreState(List<Long> state) throws Exception {
-                       if (state.isEmpty() || state.size() > 1) {
-                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + state.size());
+               public void restoreState(List<Tuple2<Long, Integer>> list) 
throws Exception {
+                       if (list.isEmpty() || list.size() > 1) {
+                               throw new RuntimeException("Test failed due to 
unexpected recovered state size " + list.size());
                        }
-                       Long s = state.get(0);
-                       LOG.debug("Restoring state {}", s);
+                       Tuple2<Long, Integer> state = list.get(0);
+                       LOG.debug("Restoring state {}", state);
 
                        // This is necessary to make sure that something is 
recovered at all. Otherwise it
                        // might happen that the job is restarted from the 
beginning.
-                       
RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), s);
+                       
RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), 1);
 
                        sync.countDown();
 
-                       current = s;
+                       current = state._1;
+                       repeat = state._2;
                }
 
                @Override
@@ -571,6 +524,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
 
                @Override
                public void invoke(Long value) throws Exception {
+
                        if (value == LastElement) {
                                numberOfReceivedLastElements++;
 
@@ -611,4 +565,41 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
                        CompletedCheckpointsLatch.countDown();
                }
        }
+
+       public static class StatefulFlatMap extends RichFlatMapFunction<Long, 
Long> implements CheckpointedFunction {
+
+               private static final long serialVersionUID = 
9031079547885320663L;
+
+               private transient ValueState<Integer> alreadySeen;
+
+               @Override
+               public void flatMap(Long input, Collector<Long> out) throws 
Exception {
+
+                       Integer seen = this.alreadySeen.value();
+                       if (seen >= Parallelism || input == -1) {
+                               out.collect(input);
+                       }
+                       this.alreadySeen.update(seen + 1);
+               }
+
+               @Override
+               public void open(Configuration config) {
+
+               }
+
+               @Override
+               public void snapshotState(FunctionSnapshotContext context) 
throws Exception {
+
+               }
+
+               @Override
+               public void initializeState(FunctionInitializationContext 
context) throws Exception {
+                       ValueStateDescriptor<Integer> descriptor =
+                               new ValueStateDescriptor<>(
+                                       "seenCountState",
+                                       TypeInformation.of(new 
TypeHint<Integer>() {}),
+                                       0);
+                       alreadySeen = 
context.getKeyedStateStore().getState(descriptor);
+               }
+       }
 }

Reply via email to