http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index 96c95ea..8156964 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -328,7 +328,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { } } - static class TestOperatorSubtaskState extends OperatorSubtaskState { + public static class TestOperatorSubtaskState extends OperatorSubtaskState { private static final long serialVersionUID = 522580433699164230L; boolean registered; @@ -359,6 +359,14 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger { registered = false; discarded = false; } + + public boolean isRegistered() { + return registered; + } + + public boolean isDiscarded() { + return discarded; + } } }
http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java new file mode 100644 index 0000000..09c9efb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskStateTest.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.util.TestLogger; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.function.Function; + +import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewKeyedStateHandle; +import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewOperatorStateHandle; +import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.deepDummyCopy; + +public class PrioritizedOperatorSubtaskStateTest extends TestLogger { + + private final Random random = new Random(0x42); + + /** + * This tests attempts to test (almost) the full space of significantly different options for verifying and + * prioritizing {@link OperatorSubtaskState} options for local recovery over primary/remote state handles. + */ + @Test + public void testPrioritization() { + + for (int i = 0; i < 81; ++i) { // 3^4 possible configurations. + + OperatorSubtaskState primaryAndFallback = generateForConfiguration(i); + + for (int j = 0; j < 9; ++j) { // we test 3^2 configurations. + // mode 0: one valid state handle (deep copy of original). + // mode 1: empty StateHandleCollection. + // mode 2: one invalid state handle (e.g. wrong key group, different meta data) + int modeFirst = j % 3; + OperatorSubtaskState bestAlternative = createAlternativeSubtaskState(primaryAndFallback, modeFirst); + int modeSecond = (j / 3) % 3; + OperatorSubtaskState secondBestAlternative = createAlternativeSubtaskState(primaryAndFallback, modeSecond); + + List<OperatorSubtaskState> orderedAlternativesList = + Arrays.asList(bestAlternative, secondBestAlternative); + List<OperatorSubtaskState> validAlternativesList = new ArrayList<>(3); + if (modeFirst == 0) { + validAlternativesList.add(bestAlternative); + } + if (modeSecond == 0) { + validAlternativesList.add(secondBestAlternative); + } + validAlternativesList.add(primaryAndFallback); + + PrioritizedOperatorSubtaskState.Builder builder = + new PrioritizedOperatorSubtaskState.Builder(primaryAndFallback, orderedAlternativesList); + + PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskState = builder.build(); + + OperatorSubtaskState[] validAlternatives = + validAlternativesList.toArray(new OperatorSubtaskState[validAlternativesList.size()]); + + OperatorSubtaskState[] onlyPrimary = + new OperatorSubtaskState[]{primaryAndFallback}; + + Assert.assertTrue(checkResultAsExpected( + OperatorSubtaskState::getManagedOperatorState, + PrioritizedOperatorSubtaskState::getPrioritizedManagedOperatorState, + prioritizedOperatorSubtaskState, + primaryAndFallback.getManagedOperatorState().size() == 1 ? validAlternatives : onlyPrimary)); + + Assert.assertTrue(checkResultAsExpected( + OperatorSubtaskState::getManagedKeyedState, + PrioritizedOperatorSubtaskState::getPrioritizedManagedKeyedState, + prioritizedOperatorSubtaskState, + primaryAndFallback.getManagedKeyedState().size() == 1 ? validAlternatives : onlyPrimary)); + + Assert.assertTrue(checkResultAsExpected( + OperatorSubtaskState::getRawOperatorState, + PrioritizedOperatorSubtaskState::getPrioritizedRawOperatorState, + prioritizedOperatorSubtaskState, + primaryAndFallback.getRawOperatorState().size() == 1 ? validAlternatives : onlyPrimary)); + + Assert.assertTrue(checkResultAsExpected( + OperatorSubtaskState::getRawKeyedState, + PrioritizedOperatorSubtaskState::getPrioritizedRawKeyedState, + prioritizedOperatorSubtaskState, + primaryAndFallback.getRawKeyedState().size() == 1 ? validAlternatives : onlyPrimary)); + } + } + } + + /** + * Generator for all 3^4 = 81 possible configurations of a OperatorSubtaskState: + * - 4 different sub-states: + * managed/raw + operator/keyed. + * - 3 different options per sub-state: + * empty (simulate no state), single handle (simulate recovery), 2 handles (simulate e.g. rescaling) + */ + private OperatorSubtaskState generateForConfiguration(int conf) { + + Preconditions.checkState(conf >= 0 && conf <= 80); // 3^4 + final int numModes = 3; + + KeyGroupRange keyGroupRange = new KeyGroupRange(0, 4); + KeyGroupRange keyGroupRange1 = new KeyGroupRange(0, 2); + KeyGroupRange keyGroupRange2 = new KeyGroupRange(3, 4); + + int div = 1; + int mode = (conf / div) % numModes; + StateObjectCollection<OperatorStateHandle> s1 = + mode == 0 ? + StateObjectCollection.empty() : + mode == 1 ? + new StateObjectCollection<>( + Collections.singletonList(createNewOperatorStateHandle(2, random))) : + new StateObjectCollection<>( + Arrays.asList( + createNewOperatorStateHandle(2, random), + createNewOperatorStateHandle(2, random))); + div *= numModes; + mode = (conf / div) % numModes; + StateObjectCollection<OperatorStateHandle> s2 = + mode == 0 ? + StateObjectCollection.empty() : + mode == 1 ? + new StateObjectCollection<>( + Collections.singletonList(createNewOperatorStateHandle(2, random))) : + new StateObjectCollection<>( + Arrays.asList( + createNewOperatorStateHandle(2, random), + createNewOperatorStateHandle(2, random))); + + div *= numModes; + mode = (conf / div) % numModes; + StateObjectCollection<KeyedStateHandle> s3 = + mode == 0 ? + StateObjectCollection.empty() : + mode == 1 ? + new StateObjectCollection<>( + Collections.singletonList(createNewKeyedStateHandle(keyGroupRange))) : + new StateObjectCollection<>( + Arrays.asList( + createNewKeyedStateHandle(keyGroupRange1), + createNewKeyedStateHandle(keyGroupRange2))); + + div *= numModes; + mode = (conf / div) % numModes; + StateObjectCollection<KeyedStateHandle> s4 = + mode == 0 ? + StateObjectCollection.empty() : + mode == 1 ? + new StateObjectCollection<>( + Collections.singletonList(createNewKeyedStateHandle(keyGroupRange))) : + new StateObjectCollection<>( + Arrays.asList( + createNewKeyedStateHandle(keyGroupRange1), + createNewKeyedStateHandle(keyGroupRange2))); + + return new OperatorSubtaskState(s1, s2, s3, s4); + } + + /** + * For all 4 sub-states: + * - mode 0: One valid state handle (deep copy of original). Only this creates an OperatorSubtaskState that + * qualifies as alternative. + * - mode 1: Empty StateHandleCollection. + * - mode 2: One invalid state handle (e.g. wrong key group, different meta data) + */ + private OperatorSubtaskState createAlternativeSubtaskState(OperatorSubtaskState primaryOriginal, int mode) { + switch (mode) { + case 0: + return new OperatorSubtaskState( + deepCopyFirstElement(primaryOriginal.getManagedOperatorState()), + deepCopyFirstElement(primaryOriginal.getRawOperatorState()), + deepCopyFirstElement(primaryOriginal.getManagedKeyedState()), + deepCopyFirstElement(primaryOriginal.getRawKeyedState())); + case 1: + return new OperatorSubtaskState(); + case 2: + KeyGroupRange otherRange = new KeyGroupRange(8, 16); + int numNamedStates = 2; + return new OperatorSubtaskState( + createNewOperatorStateHandle(numNamedStates, random), + createNewOperatorStateHandle(numNamedStates, random), + createNewKeyedStateHandle(otherRange), + createNewKeyedStateHandle(otherRange)); + default: + throw new IllegalArgumentException("Mode: " + mode); + } + } + + private <T extends StateObject> boolean checkResultAsExpected( + Function<OperatorSubtaskState, StateObjectCollection<T>> extractor, + Function<PrioritizedOperatorSubtaskState, Iterator<StateObjectCollection<T>>> extractor2, + PrioritizedOperatorSubtaskState prioritizedResult, + OperatorSubtaskState... expectedOrdered) { + + List<StateObjectCollection<T>> collector = new ArrayList<>(expectedOrdered.length); + for (OperatorSubtaskState operatorSubtaskState : expectedOrdered) { + collector.add(extractor.apply(operatorSubtaskState)); + } + + return checkRepresentSameOrder( + extractor2.apply(prioritizedResult), + collector.toArray(new StateObjectCollection[collector.size()])); + } + + private boolean checkRepresentSameOrder( + Iterator<? extends StateObjectCollection<?>> ordered, + StateObjectCollection<?>... expectedOrder) { + + for (StateObjectCollection<?> objects : expectedOrder) { + if (!ordered.hasNext() || !checkContainedObjectsReferentialEquality(objects, ordered.next())) { + return false; + } + } + + return !ordered.hasNext(); + } + + /** + * Returns true iff, in iteration order, all objects in the first collection are equal by reference to their + * corresponding object (by order) in the second collection and the size of the collections is equal. + */ + public boolean checkContainedObjectsReferentialEquality(StateObjectCollection<?> a, StateObjectCollection<?> b) { + + if (a == b) { + return true; + } + + if(a == null || b == null) { + return false; + } + + if (a.size() != b.size()) { + return false; + } + + Iterator<?> bIter = b.iterator(); + for (StateObject stateObject : a) { + if (!bIter.hasNext() || bIter.next() != stateObject) { + return false; + } + } + return true; + } + + /** + * Creates a deep copy of the first state object in the given collection, or null if the collection is empy. + */ + private <T extends StateObject> T deepCopyFirstElement(StateObjectCollection<T> original) { + if (original.isEmpty()) { + return null; + } + + T stateObject = original.iterator().next(); + StateObject result; + if (stateObject instanceof OperatorStreamStateHandle) { + result = deepDummyCopy((OperatorStateHandle) stateObject); + } else if (stateObject instanceof KeyedStateHandle) { + result = deepDummyCopy((KeyedStateHandle) stateObject); + } else { + throw new IllegalStateException(); + } + return (T) result; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateHandleDummyUtil.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateHandleDummyUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateHandleDummyUtil.java new file mode 100644 index 0000000..548ca18e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateHandleDummyUtil.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + +public class StateHandleDummyUtil { + + /** + * Creates a new test {@link OperatorStreamStateHandle} with a given number of randomly created named states. + */ + public static OperatorStateHandle createNewOperatorStateHandle(int numNamedStates, Random random) { + Map<String, OperatorStateHandle.StateMetaInfo> operatorStateMetaData = new HashMap<>(numNamedStates); + byte[] streamData = new byte[numNamedStates * 4]; + random.nextBytes(streamData); + long off = 0; + for (int i = 0; i < numNamedStates; ++i) { + long[] offsets = new long[4]; + for (int o = 0; o < offsets.length; ++o) { + offsets[o] = off++; + } + OperatorStateHandle.StateMetaInfo metaInfo = + new OperatorStateHandle.StateMetaInfo(offsets, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE); + operatorStateMetaData.put(String.valueOf(UUID.randomUUID()), metaInfo); + } + ByteStreamStateHandle byteStreamStateHandle = + new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), streamData); + return new OperatorStreamStateHandle(operatorStateMetaData, byteStreamStateHandle); + } + + /** + * Creates a new test {@link KeyedStateHandle} for the given key-group. + */ + public static KeyedStateHandle createNewKeyedStateHandle(KeyGroupRange keyGroupRange) { + return new DummyKeyedStateHandle(keyGroupRange); + } + + /** + * Creates a deep copy of the given {@link OperatorStreamStateHandle}. + */ + public static OperatorStateHandle deepDummyCopy(OperatorStateHandle original) { + + if (original == null) { + return null; + } + + ByteStreamStateHandle stateHandle = (ByteStreamStateHandle) original.getDelegateStateHandle(); + ByteStreamStateHandle stateHandleCopy = new ByteStreamStateHandle( + String.valueOf(stateHandle.getHandleName()), + stateHandle.getData().clone()); + Map<String, OperatorStateHandle.StateMetaInfo> offsets = original.getStateNameToPartitionOffsets(); + Map<String, OperatorStateHandle.StateMetaInfo> offsetsCopy = new HashMap<>(offsets.size()); + + for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : offsets.entrySet()) { + OperatorStateHandle.StateMetaInfo metaInfo = entry.getValue(); + OperatorStateHandle.StateMetaInfo metaInfoCopy = + new OperatorStateHandle.StateMetaInfo(metaInfo.getOffsets(), metaInfo.getDistributionMode()); + offsetsCopy.put(String.valueOf(entry.getKey()), metaInfoCopy); + } + return new OperatorStreamStateHandle(offsetsCopy, stateHandleCopy); + } + + /** + * Creates deep copy of the given {@link KeyedStateHandle}. + */ + public static KeyedStateHandle deepDummyCopy(KeyedStateHandle original) { + + if (original == null) { + return null; + } + + KeyGroupRange keyGroupRange = original.getKeyGroupRange(); + return new DummyKeyedStateHandle( + new KeyGroupRange(keyGroupRange.getStartKeyGroup(), keyGroupRange.getEndKeyGroup())); + } + + /** + * KeyedStateHandle that only holds a key-group information. + */ + private static class DummyKeyedStateHandle implements KeyedStateHandle { + + private static final long serialVersionUID = 1L; + + private final KeyGroupRange keyGroupRange; + + private DummyKeyedStateHandle(KeyGroupRange keyGroupRange) { + this.keyGroupRange = keyGroupRange; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + return new DummyKeyedStateHandle(this.keyGroupRange.getIntersection(keyGroupRange)); + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry) { + } + + @Override + public void discardState() throws Exception { + } + + @Override + public long getStateSize() { + return 0L; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateObjectCollectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateObjectCollectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateObjectCollectionTest.java new file mode 100644 index 0000000..b12ee27 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateObjectCollectionTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.util.MethodForwardingTestUtil; +import org.apache.flink.util.TestLogger; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link StateObjectCollection}. + */ +public class StateObjectCollectionTest extends TestLogger { + + @Test + public void testEmptyCollection() { + StateObjectCollection<StateObject> empty = StateObjectCollection.empty(); + Assert.assertEquals(0, empty.getStateSize()); + } + + @Test + public void testForwardingCollectionMethods() throws Exception { + MethodForwardingTestUtil.testMethodForwarding( + Collection.class, + ((Function<Collection, StateObjectCollection>) StateObjectCollection::new)); + } + + @Test + public void testForwardingStateObjectMethods() throws Exception { + MethodForwardingTestUtil.testMethodForwarding( + StateObject.class, + object -> new StateObjectCollection<>(Collections.singletonList(object))); + } + + @Test + public void testHasState() { + StateObjectCollection<StateObject> stateObjects = new StateObjectCollection<>(new ArrayList<>()); + Assert.assertFalse(stateObjects.hasState()); + + stateObjects = new StateObjectCollection<>(Collections.singletonList(null)); + Assert.assertFalse(stateObjects.hasState()); + + stateObjects = new StateObjectCollection<>(Collections.singletonList(mock(StateObject.class))); + Assert.assertTrue(stateObjects.hasState()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshotTest.java new file mode 100644 index 0000000..76f3906 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshotTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.util.TestLogger; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Random; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class TaskStateSnapshotTest extends TestLogger { + + @Test + public void putGetSubtaskStateByOperatorID() { + TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(); + + OperatorID operatorID_1 = new OperatorID(); + OperatorID operatorID_2 = new OperatorID(); + OperatorSubtaskState operatorSubtaskState_1 = new OperatorSubtaskState(); + OperatorSubtaskState operatorSubtaskState_2 = new OperatorSubtaskState(); + OperatorSubtaskState operatorSubtaskState_1_replace = new OperatorSubtaskState(); + + Assert.assertNull(taskStateSnapshot.getSubtaskStateByOperatorID(operatorID_1)); + Assert.assertNull(taskStateSnapshot.getSubtaskStateByOperatorID(operatorID_2)); + taskStateSnapshot.putSubtaskStateByOperatorID(operatorID_1, operatorSubtaskState_1); + taskStateSnapshot.putSubtaskStateByOperatorID(operatorID_2, operatorSubtaskState_2); + Assert.assertEquals(operatorSubtaskState_1, taskStateSnapshot.getSubtaskStateByOperatorID(operatorID_1)); + Assert.assertEquals(operatorSubtaskState_2, taskStateSnapshot.getSubtaskStateByOperatorID(operatorID_2)); + Assert.assertEquals(operatorSubtaskState_1, taskStateSnapshot.putSubtaskStateByOperatorID(operatorID_1, operatorSubtaskState_1_replace)); + Assert.assertEquals(operatorSubtaskState_1_replace, taskStateSnapshot.getSubtaskStateByOperatorID(operatorID_1)); + } + + @Test + public void hasState() { + Random random = new Random(0x42); + TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(); + Assert.assertFalse(taskStateSnapshot.hasState()); + + OperatorSubtaskState emptyOperatorSubtaskState = new OperatorSubtaskState(); + Assert.assertFalse(emptyOperatorSubtaskState.hasState()); + taskStateSnapshot.putSubtaskStateByOperatorID(new OperatorID(), emptyOperatorSubtaskState); + Assert.assertFalse(taskStateSnapshot.hasState()); + + OperatorStateHandle stateHandle = StateHandleDummyUtil.createNewOperatorStateHandle(2, random); + OperatorSubtaskState nonEmptyOperatorSubtaskState = new OperatorSubtaskState( + stateHandle, + null, + null, + null + ); + + Assert.assertTrue(nonEmptyOperatorSubtaskState.hasState()); + taskStateSnapshot.putSubtaskStateByOperatorID(new OperatorID(), nonEmptyOperatorSubtaskState); + Assert.assertTrue(taskStateSnapshot.hasState()); + } + + @Test + public void discardState() throws Exception { + TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(); + OperatorID operatorID_1 = new OperatorID(); + OperatorID operatorID_2 = new OperatorID(); + + OperatorSubtaskState operatorSubtaskState_1 = mock(OperatorSubtaskState.class); + OperatorSubtaskState operatorSubtaskState_2 = mock(OperatorSubtaskState.class); + + taskStateSnapshot.putSubtaskStateByOperatorID(operatorID_1, operatorSubtaskState_1); + taskStateSnapshot.putSubtaskStateByOperatorID(operatorID_2, operatorSubtaskState_2); + + taskStateSnapshot.discardState(); + verify(operatorSubtaskState_1).discardState(); + verify(operatorSubtaskState_2).discardState(); + } + + @Test + public void getStateSize() { + Random random = new Random(0x42); + TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(); + Assert.assertEquals(0, taskStateSnapshot.getStateSize()); + + OperatorSubtaskState emptyOperatorSubtaskState = new OperatorSubtaskState(); + Assert.assertFalse(emptyOperatorSubtaskState.hasState()); + taskStateSnapshot.putSubtaskStateByOperatorID(new OperatorID(), emptyOperatorSubtaskState); + Assert.assertEquals(0, taskStateSnapshot.getStateSize()); + + + OperatorStateHandle stateHandle_1 = StateHandleDummyUtil.createNewOperatorStateHandle(2, random); + OperatorSubtaskState nonEmptyOperatorSubtaskState_1 = new OperatorSubtaskState( + stateHandle_1, + null, + null, + null + ); + + OperatorStateHandle stateHandle_2 = StateHandleDummyUtil.createNewOperatorStateHandle(2, random); + OperatorSubtaskState nonEmptyOperatorSubtaskState_2 = new OperatorSubtaskState( + null, + stateHandle_2, + null, + null + ); + + taskStateSnapshot.putSubtaskStateByOperatorID(new OperatorID(), nonEmptyOperatorSubtaskState_1); + taskStateSnapshot.putSubtaskStateByOperatorID(new OperatorID(), nonEmptyOperatorSubtaskState_2); + + long totalSize = stateHandle_1.getStateSize() + stateHandle_2.getStateSize(); + Assert.assertEquals(totalSize, taskStateSnapshot.getStateSize()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java index d1d67ff..1963766 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java @@ -33,10 +33,10 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.OperatorStateHandle.StateMetaInfo; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.StringUtils; import java.util.ArrayList; @@ -87,24 +87,24 @@ public class CheckpointTestUtils { for (int subtaskIdx = 0; subtaskIdx < numSubtasksPerTask; subtaskIdx++) { StreamStateHandle operatorStateBackend = - new TestByteStreamStateHandleDeepCompare("b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET)); + new ByteStreamStateHandle("b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET)); StreamStateHandle operatorStateStream = - new TestByteStreamStateHandleDeepCompare("b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET)); + new ByteStreamStateHandle("b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET)); OperatorStateHandle operatorStateHandleBackend = null; OperatorStateHandle operatorStateHandleStream = null; - Map<String, StateMetaInfo> offsetsMap = new HashMap<>(); + Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(); offsetsMap.put("A", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); offsetsMap.put("C", new OperatorStateHandle.StateMetaInfo(new long[]{60, 70, 80}, OperatorStateHandle.Mode.UNION)); if (hasOperatorStateBackend) { - operatorStateHandleBackend = new OperatorStateHandle(offsetsMap, operatorStateBackend); + operatorStateHandleBackend = new OperatorStreamStateHandle(offsetsMap, operatorStateBackend); } if (hasOperatorStateStream) { - operatorStateHandleStream = new OperatorStateHandle(offsetsMap, operatorStateStream); + operatorStateHandleStream = new OperatorStreamStateHandle(offsetsMap, operatorStateStream); } KeyedStateHandle keyedStateBackend = null; @@ -173,23 +173,23 @@ public class CheckpointTestUtils { for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) { StreamStateHandle operatorStateBackend = - new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET)); + new ByteStreamStateHandle("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET)); StreamStateHandle operatorStateStream = - new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET)); - Map<String, StateMetaInfo> offsetsMap = new HashMap<>(); + new ByteStreamStateHandle("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET)); + Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(); offsetsMap.put("A", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); offsetsMap.put("C", new OperatorStateHandle.StateMetaInfo(new long[]{60, 70, 80}, OperatorStateHandle.Mode.UNION)); if (chainIdx != noOperatorStateBackendAtIndex) { OperatorStateHandle operatorStateHandleBackend = - new OperatorStateHandle(offsetsMap, operatorStateBackend); + new OperatorStreamStateHandle(offsetsMap, operatorStateBackend); operatorStatesBackend.add(operatorStateHandleBackend); } if (chainIdx != noOperatorStateStreamAtIndex) { OperatorStateHandle operatorStateHandleStream = - new OperatorStateHandle(offsetsMap, operatorStateStream); + new OperatorStreamStateHandle(offsetsMap, operatorStateStream); operatorStatesStream.add(operatorStateHandleStream); } } @@ -284,7 +284,7 @@ public class CheckpointTestUtils { } public static StreamStateHandle createDummyStreamStateHandle(Random rnd) { - return new TestByteStreamStateHandleDeepCompare( + return new ByteStreamStateHandle( String.valueOf(createRandomUUID(rnd)), String.valueOf(createRandomUUID(rnd)).getBytes(ConfigConstants.DEFAULT_CHARSET)); } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 8f1e12c..005dd98 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -37,7 +37,9 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -68,6 +70,7 @@ import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManager; @@ -79,7 +82,6 @@ import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore; import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore; -import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -95,6 +97,7 @@ import akka.pattern.Patterns; import akka.testkit.CallingThreadDispatcher; import akka.testkit.JavaTestKit; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -105,6 +108,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -479,17 +483,21 @@ public class JobManagerHARecoveryTest extends TestLogger { OperatorID operatorID = OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()); TaskStateManager taskStateManager = getEnvironment().getTaskStateManager(); - OperatorSubtaskState subtaskState = taskStateManager.operatorStates(operatorID); + PrioritizedOperatorSubtaskState subtaskState = taskStateManager.prioritizedOperatorState(operatorID); - if(subtaskState != null) { - int subtaskIndex = getIndexInSubtaskGroup(); - if (subtaskIndex < BlockingStatefulInvokable.recoveredStates.length) { - OperatorStateHandle operatorStateHandle = subtaskState.getManagedOperatorState().iterator().next(); + int subtaskIndex = getIndexInSubtaskGroup(); + if (subtaskIndex < BlockingStatefulInvokable.recoveredStates.length) { + Iterator<OperatorStateHandle> iterator = + subtaskState.getJobManagerManagedOperatorState().iterator(); + + if (iterator.hasNext()) { + OperatorStateHandle operatorStateHandle = iterator.next(); try (FSDataInputStream in = operatorStateHandle.openInputStream()) { BlockingStatefulInvokable.recoveredStates[subtaskIndex] = InstantiationUtil.deserializeObject(in, getUserCodeClassLoader()); } } + Assert.assertFalse(iterator.hasNext()); } LATCH.await(); @@ -516,7 +524,7 @@ public class JobManagerHARecoveryTest extends TestLogger { @Override public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception { - ByteStreamStateHandle byteStreamStateHandle = new TestByteStreamStateHandleDeepCompare( + ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle( String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId())); @@ -525,16 +533,16 @@ public class JobManagerHARecoveryTest extends TestLogger { "test-state", new OperatorStateHandle.StateMetaInfo(new long[]{0L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); - OperatorStateHandle operatorStateHandle = new OperatorStateHandle(stateNameToPartitionOffsets, byteStreamStateHandle); + OperatorStateHandle operatorStateHandle = new OperatorStreamStateHandle(stateNameToPartitionOffsets, byteStreamStateHandle); TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot(); checkpointStateHandles.putSubtaskStateByOperatorID( OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()), new OperatorSubtaskState( - Collections.singletonList(operatorStateHandle), - Collections.emptyList(), - Collections.emptyList(), - Collections.emptyList())); + StateObjectCollection.singleton(operatorStateHandle), + StateObjectCollection.empty(), + StateObjectCollection.empty(), + StateObjectCollection.empty())); getEnvironment().acknowledgeCheckpoint( checkpointMetaData.getCheckpointId(), http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java index 9454d90..c1a7b53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.jobmanager; -import akka.actor.ActorRef; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.concurrent.Executors; @@ -28,11 +27,12 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGr import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.state.RetrievableStreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; -import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; + +import akka.actor.ActorRef; import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -65,7 +65,7 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends TestLogger { private final static RetrievableStateStorageHelper<SubmittedJobGraph> localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() { @Override public RetrievableStateHandle<SubmittedJobGraph> store(SubmittedJobGraph state) throws IOException { - ByteStreamStateHandle byteStreamStateHandle = new TestByteStreamStateHandleDeepCompare( + ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle( String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject(state)); return new RetrievableStreamStateHandle<>(byteStreamStateHandle); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java index 4186255..b7aa97d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java @@ -91,7 +91,7 @@ public class SchedulerTestUtils { when(vertex.toString()).thenReturn("TEST-VERTEX"); when(vertex.getJobVertex()).thenReturn(executionJobVertex); when(vertex.getJobvertexId()).thenReturn(new JobVertexID()); - + Execution execution = mock(Execution.class); when(execution.getVertex()).thenReturn(vertex); @@ -126,6 +126,7 @@ public class SchedulerTestUtils { ExecutionVertex vertex = mock(ExecutionVertex.class); when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures); + when(vertex.getPreferredLocations()).thenReturn(preferredLocationFutures); when(vertex.getJobId()).thenReturn(new JobID()); when(vertex.toString()).thenReturn("TEST-VERTEX"); when(vertex.getJobVertex()).thenReturn(executionJobVertex); @@ -152,7 +153,7 @@ public class SchedulerTestUtils { when(vertex.toString()).thenReturn("TEST-VERTEX"); when(vertex.getTaskNameWithSubtaskIndex()).thenReturn("TEST-VERTEX"); when(vertex.getJobVertex()).thenReturn(executionJobVertex); - + Execution execution = mock(Execution.class); when(execution.getVertex()).thenReturn(vertex); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index 1798851..db04023 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.metrics; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; import org.apache.flink.runtime.jobmanager.JobManager; @@ -98,6 +99,7 @@ public class TaskManagerMetricsTest extends TestLogger { TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( taskManagerServicesConfiguration, tmResourceID, + Executors.directExecutor(), EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(), EnvironmentInformation.getMaxJvmHeapMemory()); @@ -115,6 +117,7 @@ public class TaskManagerMetricsTest extends TestLogger { taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), + taskManagerServices.getTaskManagerStateStore(), highAvailabilityServices, taskManagerMetricGroup); http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index 3c73e3d..7d1a777 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.TaskStateManager; +import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; @@ -59,12 +60,17 @@ public class DummyEnvironment implements Environment { private TaskStateManager taskStateManager; private final AccumulatorRegistry accumulatorRegistry = new AccumulatorRegistry(jobId, executionId); + public DummyEnvironment() { + this("Test Job", 1, 0, 1); + } + public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex) { this(taskName, numSubTasks, subTaskIndex, numSubTasks); } public DummyEnvironment(String taskName, int numSubTasks, int subTaskIndex, int maxParallelism) { this.taskInfo = new TaskInfo(taskName, maxParallelism, subTaskIndex, numSubTasks, 0); + this.taskStateManager = new TestTaskStateManager(); } public void setKvStateRegistry(KvStateRegistry kvStateRegistry) { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 4d1037e..ce19a5e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -89,7 +89,9 @@ public class MockEnvironment implements Environment, AutoCloseable { private final List<ResultPartitionWriter> outputs; - private final JobID jobID = new JobID(); + private final JobID jobID; + + private final JobVertexID jobVertexID; private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager(); @@ -170,11 +172,11 @@ public class MockEnvironment implements Environment, AutoCloseable { bufferSize, taskConfiguration, executionConfig, + taskStateManager, maxParallelism, parallelism, subtaskIndex, - Thread.currentThread().getContextClassLoader(), - taskStateManager); + Thread.currentThread().getContextClassLoader()); } @@ -185,11 +187,45 @@ public class MockEnvironment implements Environment, AutoCloseable { int bufferSize, Configuration taskConfiguration, ExecutionConfig executionConfig, + TaskStateManager taskStateManager, int maxParallelism, int parallelism, int subtaskIndex, - ClassLoader userCodeClassLoader, - TaskStateManager taskStateManager) { + ClassLoader userCodeClassLoader) { + this( + new JobID(), + new JobVertexID(), + taskName, + memorySize, + inputSplitProvider, + bufferSize, + taskConfiguration, + executionConfig, + taskStateManager, + maxParallelism, + parallelism, + subtaskIndex, + userCodeClassLoader); + } + + public MockEnvironment( + JobID jobID, + JobVertexID jobVertexID, + String taskName, + long memorySize, + MockInputSplitProvider inputSplitProvider, + int bufferSize, + Configuration taskConfiguration, + ExecutionConfig executionConfig, + TaskStateManager taskStateManager, + int maxParallelism, + int parallelism, + int subtaskIndex, + ClassLoader userCodeClassLoader) { + + this.jobID = jobID; + this.jobVertexID = jobVertexID; + this.taskInfo = new TaskInfo(taskName, maxParallelism, subtaskIndex, parallelism, 0); this.jobConfiguration = new Configuration(); this.taskConfiguration = taskConfiguration; @@ -325,7 +361,7 @@ public class MockEnvironment implements Environment, AutoCloseable { @Override public JobVertexID getJobVertexId() { - return new JobVertexID(new byte[16]); + return jobVertexID; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java new file mode 100644 index 0000000..2af25d9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProviderTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.util.MethodForwardingTestUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; + +public class CheckpointStreamWithResultProviderTest extends TestLogger { + + private static TemporaryFolder temporaryFolder; + + @BeforeClass + public static void beforeClass() throws IOException { + temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + } + + @AfterClass + public static void afterClass() { + temporaryFolder.delete(); + } + + @Test + public void testFactory() throws Exception { + + CheckpointStreamFactory primaryFactory = createCheckpointStreamFactory(); + try ( + CheckpointStreamWithResultProvider primaryOnly = + CheckpointStreamWithResultProvider.createSimpleStream( + CheckpointedStateScope.EXCLUSIVE, + primaryFactory)) { + + Assert.assertTrue(primaryOnly instanceof CheckpointStreamWithResultProvider.PrimaryStreamOnly); + } + + LocalRecoveryDirectoryProvider directoryProvider = createLocalRecoveryDirectoryProvider(); + try ( + CheckpointStreamWithResultProvider primaryAndSecondary = + CheckpointStreamWithResultProvider.createDuplicatingStream( + 42L, + CheckpointedStateScope.EXCLUSIVE, + primaryFactory, + directoryProvider)) { + + Assert.assertTrue(primaryAndSecondary instanceof CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream); + } + } + + @Test + public void testCloseAndFinalizeCheckpointStreamResultPrimaryOnly() throws Exception { + CheckpointStreamFactory primaryFactory = createCheckpointStreamFactory(); + + CheckpointStreamWithResultProvider resultProvider = + CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, primaryFactory); + + SnapshotResult<StreamStateHandle> result = writeCheckpointTestData(resultProvider); + + Assert.assertNotNull(result.getJobManagerOwnedSnapshot()); + Assert.assertNull(result.getTaskLocalSnapshot()); + + try (FSDataInputStream inputStream = result.getJobManagerOwnedSnapshot().openInputStream()) { + Assert.assertEquals(0x42, inputStream.read()); + Assert.assertEquals(-1, inputStream.read()); + } + } + + @Test + public void testCloseAndFinalizeCheckpointStreamResultPrimaryAndSecondary() throws Exception { + CheckpointStreamFactory primaryFactory = createCheckpointStreamFactory(); + LocalRecoveryDirectoryProvider directoryProvider = createLocalRecoveryDirectoryProvider(); + + CheckpointStreamWithResultProvider resultProvider = + CheckpointStreamWithResultProvider.createDuplicatingStream( + 42L, + CheckpointedStateScope.EXCLUSIVE, + primaryFactory, + directoryProvider); + + SnapshotResult<StreamStateHandle> result = writeCheckpointTestData(resultProvider); + + Assert.assertNotNull(result.getJobManagerOwnedSnapshot()); + Assert.assertNotNull(result.getTaskLocalSnapshot()); + + try (FSDataInputStream inputStream = result.getJobManagerOwnedSnapshot().openInputStream()) { + Assert.assertEquals(0x42, inputStream.read()); + Assert.assertEquals(-1, inputStream.read()); + } + + try (FSDataInputStream inputStream = result.getTaskLocalSnapshot().openInputStream()) { + Assert.assertEquals(0x42, inputStream.read()); + Assert.assertEquals(-1, inputStream.read()); + } + } + + @Test + public void testCompletedAndCloseStateHandling() throws Exception { + CheckpointStreamFactory primaryFactory = createCheckpointStreamFactory(); + + testCloseBeforeComplete(new CheckpointStreamWithResultProvider.PrimaryStreamOnly( + primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE))); + testCompleteBeforeClose(new CheckpointStreamWithResultProvider.PrimaryStreamOnly( + primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE))); + + testCloseBeforeComplete(new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream( + primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), + primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE))); + testCompleteBeforeClose(new CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream( + primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), + primaryFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE))); + } + + @Test + public void testCloseMethodForwarding() throws Exception { + CheckpointStreamFactory streamFactory = createCheckpointStreamFactory(); + + MethodForwardingTestUtil.testMethodForwarding( + Closeable.class, + CheckpointStreamWithResultProvider.PrimaryStreamOnly::new, + () -> { + try { + return streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + MethodForwardingTestUtil.testMethodForwarding( + Closeable.class, + CheckpointStreamWithResultProvider.PrimaryAndSecondaryStream::new, + () -> { + try { + return new DuplicatingCheckpointOutputStream( + streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE), + streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + private SnapshotResult<StreamStateHandle> writeCheckpointTestData( + CheckpointStreamWithResultProvider resultProvider) throws IOException { + + CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = + resultProvider.getCheckpointOutputStream(); + checkpointOutputStream.write(0x42); + return resultProvider.closeAndFinalizeCheckpointStreamResult(); + } + + private CheckpointStreamFactory createCheckpointStreamFactory() { + return new MemCheckpointStreamFactory(16 * 1024); + } + + /** + * Test that an exception is thrown if the stream was already closed before and we ask for a result later. + */ + private void testCloseBeforeComplete(CheckpointStreamWithResultProvider resultProvider) throws IOException { + resultProvider.getCheckpointOutputStream().write(0x42); + resultProvider.close(); + try { + resultProvider.closeAndFinalizeCheckpointStreamResult(); + Assert.fail(); + } catch (IOException ignore) { + } + } + + private void testCompleteBeforeClose(CheckpointStreamWithResultProvider resultProvider) throws IOException { + resultProvider.getCheckpointOutputStream().write(0x42); + Assert.assertNotNull(resultProvider.closeAndFinalizeCheckpointStreamResult()); + resultProvider.close(); + } + + private LocalRecoveryDirectoryProvider createLocalRecoveryDirectoryProvider() throws IOException { + File localStateDir = temporaryFolder.newFolder(); + JobID jobID = new JobID(); + JobVertexID jobVertexID = new JobVertexID(); + int subtaskIdx = 0; + return new LocalRecoveryDirectoryProviderImpl(localStateDir, jobID, jobVertexID, subtaskIdx); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java new file mode 100644 index 0000000..886dbdd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/DuplicatingCheckpointOutputStreamTest.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.commons.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Random; + +public class DuplicatingCheckpointOutputStreamTest extends TestLogger { + + /** + * Test that all writes are duplicated to both streams and that the state reflects what was written. + */ + @Test + public void testDuplicatedWrite() throws Exception { + int streamCapacity = 1024 * 1024; + TestMemoryCheckpointOutputStream primaryStream = new TestMemoryCheckpointOutputStream(streamCapacity); + TestMemoryCheckpointOutputStream secondaryStream = new TestMemoryCheckpointOutputStream(streamCapacity); + TestMemoryCheckpointOutputStream referenceStream = new TestMemoryCheckpointOutputStream(streamCapacity); + DuplicatingCheckpointOutputStream duplicatingStream = + new DuplicatingCheckpointOutputStream(primaryStream, secondaryStream, 64); + Random random = new Random(42); + for (int i = 0; i < 500; ++i) { + int choice = random.nextInt(3); + if (choice == 0) { + int val = random.nextInt(); + referenceStream.write(val); + duplicatingStream.write(val); + } else { + byte[] bytes = new byte[random.nextInt(128)]; + random.nextBytes(bytes); + if (choice == 1) { + referenceStream.write(bytes); + duplicatingStream.write(bytes); + } else { + int off = bytes.length > 0 ? random.nextInt(bytes.length) : 0; + int len = bytes.length > 0 ? random.nextInt(bytes.length - off) : 0; + referenceStream.write(bytes, off, len); + duplicatingStream.write(bytes, off, len); + } + } + Assert.assertEquals(referenceStream.getPos(), duplicatingStream.getPos()); + } + + StreamStateHandle refStateHandle = referenceStream.closeAndGetHandle(); + StreamStateHandle primaryStateHandle = duplicatingStream.closeAndGetPrimaryHandle(); + StreamStateHandle secondaryStateHandle = duplicatingStream.closeAndGetSecondaryHandle(); + + Assert.assertTrue(CommonTestUtils.isSteamContentEqual( + refStateHandle.openInputStream(), + primaryStateHandle.openInputStream())); + + Assert.assertTrue(CommonTestUtils.isSteamContentEqual( + refStateHandle.openInputStream(), + secondaryStateHandle.openInputStream())); + + refStateHandle.discardState(); + primaryStateHandle.discardState(); + secondaryStateHandle.discardState(); + } + + /** + * This is the first of a set of tests that check that exceptions from the secondary stream do not impact that we + * can create a result for the first stream. + */ + @Test + public void testSecondaryWriteFail() throws Exception { + DuplicatingCheckpointOutputStream duplicatingStream = createDuplicatingStreamWithFailingSecondary(); + testFailingSecondaryStream(duplicatingStream, () -> { + for (int i = 0; i < 128; i++) { + duplicatingStream.write(42); + } + }); + } + + @Test + public void testFailingSecondaryWriteArrayFail() throws Exception { + DuplicatingCheckpointOutputStream duplicatingStream = createDuplicatingStreamWithFailingSecondary(); + testFailingSecondaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512])); + } + + @Test + public void testFailingSecondaryWriteArrayOffsFail() throws Exception { + DuplicatingCheckpointOutputStream duplicatingStream = createDuplicatingStreamWithFailingSecondary(); + testFailingSecondaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512], 20, 130)); + } + + @Test + public void testFailingSecondaryFlush() throws Exception { + DuplicatingCheckpointOutputStream duplicatingStream = createDuplicatingStreamWithFailingSecondary(); + testFailingSecondaryStream(duplicatingStream, duplicatingStream::flush); + } + + @Test + public void testFailingSecondarySync() throws Exception { + DuplicatingCheckpointOutputStream duplicatingStream = createDuplicatingStreamWithFailingSecondary(); + testFailingSecondaryStream(duplicatingStream, duplicatingStream::sync); + } + + /** + * This is the first of a set of tests that check that exceptions from the primary stream are immediately reported. + */ + @Test + public void testPrimaryWriteFail() throws Exception { + DuplicatingCheckpointOutputStream duplicatingStream = createDuplicatingStreamWithFailingPrimary(); + testFailingPrimaryStream(duplicatingStream, () -> { + for (int i = 0; i < 128; i++) { + duplicatingStream.write(42); + } + }); + } + + @Test + public void testFailingPrimaryWriteArrayFail() throws Exception { + DuplicatingCheckpointOutputStream duplicatingStream = createDuplicatingStreamWithFailingPrimary(); + testFailingPrimaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512])); + } + + @Test + public void testFailingPrimaryWriteArrayOffsFail() throws Exception { + DuplicatingCheckpointOutputStream duplicatingStream = createDuplicatingStreamWithFailingPrimary(); + testFailingPrimaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512], 20, 130)); + } + + @Test + public void testFailingPrimaryFlush() throws Exception { + DuplicatingCheckpointOutputStream duplicatingStream = createDuplicatingStreamWithFailingPrimary(); + testFailingPrimaryStream(duplicatingStream, duplicatingStream::flush); + } + + @Test + public void testFailingPrimarySync() throws Exception { + DuplicatingCheckpointOutputStream duplicatingStream = createDuplicatingStreamWithFailingPrimary(); + testFailingPrimaryStream(duplicatingStream, duplicatingStream::sync); + } + + /** + * Tests that an exception from interacting with the secondary stream does not effect duplicating to the primary + * stream, but is reflected later when we want the secondary state handle. + */ + private void testFailingSecondaryStream( + DuplicatingCheckpointOutputStream duplicatingStream, + StreamTestMethod testMethod) throws Exception { + + testMethod.call(); + + duplicatingStream.write(42); + + FailingCheckpointOutStream secondary = + (FailingCheckpointOutStream) duplicatingStream.getSecondaryOutputStream(); + + Assert.assertTrue(secondary.isClosed()); + + long pos = duplicatingStream.getPos(); + StreamStateHandle primaryHandle = duplicatingStream.closeAndGetPrimaryHandle(); + + if (primaryHandle != null) { + Assert.assertEquals(pos, primaryHandle.getStateSize()); + } + + try { + duplicatingStream.closeAndGetSecondaryHandle(); + Assert.fail(); + } catch (IOException ioEx) { + Assert.assertEquals(ioEx.getCause(), duplicatingStream.getSecondaryStreamException()); + } + } + + /** + * Test that a failing primary stream brings up an exception. + */ + private void testFailingPrimaryStream( + DuplicatingCheckpointOutputStream duplicatingStream, + StreamTestMethod testMethod) throws Exception { + try { + testMethod.call(); + Assert.fail(); + } catch (IOException ignore) { + } finally { + IOUtils.closeQuietly(duplicatingStream); + } + } + + /** + * Tests that in case of unaligned stream positions, the secondary stream is closed and the primary still works. + * This is important because some code may rely on seeking to stream offsets in the created state files and if the + * streams are not aligned this code could fail. + */ + @Test + public void testUnalignedStreamsException() throws IOException { + int streamCapacity = 1024 * 1024; + TestMemoryCheckpointOutputStream primaryStream = new TestMemoryCheckpointOutputStream(streamCapacity); + TestMemoryCheckpointOutputStream secondaryStream = new TestMemoryCheckpointOutputStream(streamCapacity); + + primaryStream.write(42); + + DuplicatingCheckpointOutputStream stream = + new DuplicatingCheckpointOutputStream(primaryStream, secondaryStream); + + Assert.assertNotNull(stream.getSecondaryStreamException()); + Assert.assertTrue(secondaryStream.isClosed()); + + stream.write(23); + + try { + stream.closeAndGetSecondaryHandle(); + Assert.fail(); + } catch (IOException ignore) { + Assert.assertEquals(ignore.getCause(), stream.getSecondaryStreamException()); + } + + StreamStateHandle primaryHandle = stream.closeAndGetPrimaryHandle(); + + try (FSDataInputStream inputStream = primaryHandle.openInputStream();) { + Assert.assertEquals(42, inputStream.read()); + Assert.assertEquals(23, inputStream.read()); + Assert.assertEquals(-1, inputStream.read()); + } + } + + /** + * Helper + */ + private DuplicatingCheckpointOutputStream createDuplicatingStreamWithFailingSecondary() throws IOException { + int streamCapacity = 1024 * 1024; + TestMemoryCheckpointOutputStream primaryStream = new TestMemoryCheckpointOutputStream(streamCapacity); + FailingCheckpointOutStream failSecondaryStream = new FailingCheckpointOutStream(); + return new DuplicatingCheckpointOutputStream(primaryStream, failSecondaryStream, 64); + } + + private DuplicatingCheckpointOutputStream createDuplicatingStreamWithFailingPrimary() throws IOException { + int streamCapacity = 1024 * 1024; + FailingCheckpointOutStream failPrimaryStream = new FailingCheckpointOutStream(); + TestMemoryCheckpointOutputStream secondary = new TestMemoryCheckpointOutputStream(streamCapacity); + return new DuplicatingCheckpointOutputStream(failPrimaryStream, secondary, 64); + } + + /** + * Stream that throws {@link IOException} on all relevant methods under test. + */ + private static class FailingCheckpointOutStream extends CheckpointStreamFactory.CheckpointStateOutputStream { + + private boolean closed = false; + + @Nullable + @Override + public StreamStateHandle closeAndGetHandle() throws IOException { + throw new IOException(); + } + + @Override + public long getPos() throws IOException { + return 0; + } + + @Override + public void write(int b) throws IOException { + throw new IOException(); + } + + @Override + public void flush() throws IOException { + throw new IOException(); + } + + @Override + public void sync() throws IOException { + throw new IOException(); + } + + @Override + public void close() throws IOException { + this.closed = true; + } + + public boolean isClosed() { + return closed; + } + } + + @FunctionalInterface + private interface StreamTestMethod { + void call() throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImplTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImplTest.java new file mode 100644 index 0000000..cc97c0e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/LocalRecoveryDirectoryProviderImplTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +/** + * Tests for {@link LocalRecoveryDirectoryProvider}. + */ +public class LocalRecoveryDirectoryProviderImplTest extends TestLogger { + + private static final JobID JOB_ID = new JobID(); + private static final JobVertexID JOB_VERTEX_ID = new JobVertexID(); + private static final int SUBTASK_INDEX = 0; + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + private LocalRecoveryDirectoryProviderImpl directoryProvider; + private File[] allocBaseFolders; + + @Before + public void setup() throws IOException { + this.allocBaseFolders = new File[]{tmpFolder.newFolder(), tmpFolder.newFolder(), tmpFolder.newFolder()}; + this.directoryProvider = new LocalRecoveryDirectoryProviderImpl( + allocBaseFolders, + JOB_ID, + JOB_VERTEX_ID, + SUBTASK_INDEX); + } + + @Test + public void allocationBaseDir() { + for (int i = 0; i < 10; ++i) { + Assert.assertEquals(allocBaseFolders[i % allocBaseFolders.length], directoryProvider.allocationBaseDirectory(i)); + } + } + + @Test + public void selectAllocationBaseDir() { + for (int i = 0; i < allocBaseFolders.length; ++i) { + Assert.assertEquals(allocBaseFolders[i], directoryProvider.selectAllocationBaseDirectory(i)); + } + } + + @Test + public void allocationBaseDirectoriesCount() { + Assert.assertEquals(allocBaseFolders.length, directoryProvider.allocationBaseDirsCount()); + } + + @Test + public void subtaskSpecificDirectory() { + for (int i = 0; i < 10; ++i) { + Assert.assertEquals( + new File( + directoryProvider.allocationBaseDirectory(i), + directoryProvider.subtaskDirString()), + directoryProvider.subtaskBaseDirectory(i)); + } + } + + @Test + public void subtaskCheckpointSpecificDirectory() { + for (int i = 0; i < 10; ++i) { + Assert.assertEquals( + new File( + directoryProvider.subtaskBaseDirectory(i), + directoryProvider.checkpointDirString(i)), + directoryProvider.subtaskSpecificCheckpointDirectory(i)); + } + } + + @Test + public void testPathStringConstants() { + + Assert.assertEquals( + directoryProvider.subtaskDirString(), + "jid_" + JOB_ID + Path.SEPARATOR + "vtx_" + JOB_VERTEX_ID + "_sti_" + SUBTASK_INDEX); + + final long checkpointId = 42; + Assert.assertEquals( + directoryProvider.checkpointDirString(checkpointId), + "chk_" + checkpointId); + } + + @Test + public void testPreconditionsNotNullFiles() { + try { + new LocalRecoveryDirectoryProviderImpl(new File[]{null}, JOB_ID, JOB_VERTEX_ID, SUBTASK_INDEX); + Assert.fail(); + } catch (NullPointerException ignore) { + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java index 4ac64e0..23493b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java @@ -27,12 +27,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.util.FutureUtil; - import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -43,7 +43,6 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; import java.io.Serializable; -import java.util.Collections; import java.util.concurrent.RunnableFuture; import static org.junit.Assert.assertEquals; @@ -51,7 +50,6 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Tests for the {@link org.apache.flink.runtime.state.memory.MemoryStateBackend}. @@ -94,13 +92,11 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack */ @Test public void testOperatorStateRestoreFailsIfSerializerDeserializationFails() throws Exception { + DummyEnvironment env = new DummyEnvironment(); AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); - Environment env = mock(Environment.class); - when(env.getExecutionConfig()).thenReturn(new ExecutionConfig()); - when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader()); - - OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env, "test-op-name"); + OperatorStateBackend operatorStateBackend = + abstractStateBackend.createOperatorStateBackend(env, "test-op-name"); // write some state ListStateDescriptor<Serializable> stateDescriptor1 = new ListStateDescriptor<>("test1", new JavaSerializer<>()); @@ -124,9 +120,11 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE); - RunnableFuture<OperatorStateHandle> runnableFuture = + RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); - OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture); + + SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(runnableFuture); + OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); try { @@ -143,7 +141,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); - operatorStateBackend.restore(Collections.singletonList(stateHandle)); + operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle)); fail("The operator state restore should have failed if the previous state serializer could not be loaded."); } catch (IOException expected) { @@ -186,10 +184,6 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack // ========== restore snapshot ========== - Environment env = mock(Environment.class); - when(env.getExecutionConfig()).thenReturn(new ExecutionConfig()); - when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader()); - // mock failure when deserializing serializer TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class); @@ -197,7 +191,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); try { - restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); + restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, new DummyEnvironment()); fail("The keyed state restore should have failed if the previous state serializer could not be loaded."); } catch (IOException expected) { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java deleted file mode 100644 index dd34f03..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MultiStreamStateHandleTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -public class MultiStreamStateHandleTest { - - private static final int TEST_DATA_LENGTH = 123; - private Random random; - private byte[] testData; - private List<StreamStateHandle> streamStateHandles; - - @Before - public void setup() { - random = new Random(0x42); - testData = new byte[TEST_DATA_LENGTH]; - for (int i = 0; i < testData.length; ++i) { - testData[i] = (byte) i; - } - - int idx = 0; - streamStateHandles = new ArrayList<>(); - while (idx < testData.length) { - int len = random.nextInt(5); - byte[] sub = Arrays.copyOfRange(testData, idx, idx + len); - streamStateHandles.add(new ByteStreamStateHandle(String.valueOf(idx), sub)); - idx += len; - } - } - - @Test - public void testMetaData() throws IOException { - MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles); - assertEquals(TEST_DATA_LENGTH, multiStreamStateHandle.getStateSize()); - } - - @Test - public void testLinearRead() throws IOException { - MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles); - try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) { - - for (int i = 0; i < TEST_DATA_LENGTH; ++i) { - assertEquals(i, in.getPos()); - assertEquals(testData[i], in.read()); - } - - assertEquals(-1, in.read()); - assertEquals(TEST_DATA_LENGTH, in.getPos()); - assertEquals(-1, in.read()); - assertEquals(TEST_DATA_LENGTH, in.getPos()); - } - } - - @Test - public void testRandomRead() throws IOException { - - MultiStreamStateHandle multiStreamStateHandle = new MultiStreamStateHandle(streamStateHandles); - - try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) { - - for (int i = 0; i < 1000; ++i) { - int pos = random.nextInt(TEST_DATA_LENGTH); - int readLen = random.nextInt(TEST_DATA_LENGTH); - in.seek(pos); - while (--readLen > 0 && pos < TEST_DATA_LENGTH) { - assertEquals(pos, in.getPos()); - assertEquals(testData[pos++], in.read()); - } - } - - in.seek(TEST_DATA_LENGTH); - assertEquals(TEST_DATA_LENGTH, in.getPos()); - assertEquals(-1, in.read()); - - try { - in.seek(TEST_DATA_LENGTH + 1); - fail(); - } catch (Exception ignored) { - - } - } - } - - @Test - public void testEmptyList() throws IOException { - - MultiStreamStateHandle multiStreamStateHandle = - new MultiStreamStateHandle(Collections.<StreamStateHandle>emptyList()); - - try (FSDataInputStream in = multiStreamStateHandle.openInputStream()) { - - assertEquals(0, in.getPos()); - in.seek(0); - assertEquals(0, in.getPos()); - assertEquals(-1, in.read()); - - try { - in.seek(1); - fail(); - } catch (Exception ignored) { - - } - } - } -} \ No newline at end of file
