Repository: flink Updated Branches: refs/heads/master 30bb958a7 -> ab014ef94
http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java new file mode 100644 index 0000000..08896da --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteStateTableTest.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.state.ArrayListSerializer; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class CopyOnWriteStateTableTest { + + /** + * Testing the basic map operations. + */ + @Test + public void testPutGetRemoveContainsTransform() throws Exception { + RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = + new RegisteredBackendStateMetaInfo<>( + StateDescriptor.Type.UNKNOWN, + "test", + IntSerializer.INSTANCE, + new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects. + + final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE); + + final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable = + new CopyOnWriteStateTable<>(keyContext, metaInfo); + + ArrayList<Integer> state_1_1 = new ArrayList<>(); + state_1_1.add(41); + ArrayList<Integer> state_2_1 = new ArrayList<>(); + state_2_1.add(42); + ArrayList<Integer> state_1_2 = new ArrayList<>(); + state_1_2.add(43); + + Assert.assertNull(stateTable.putAndGetOld(1, 1, state_1_1)); + Assert.assertEquals(state_1_1, stateTable.get(1, 1)); + Assert.assertEquals(1, stateTable.size()); + + Assert.assertNull(stateTable.putAndGetOld(2, 1, state_2_1)); + Assert.assertEquals(state_2_1, stateTable.get(2, 1)); + Assert.assertEquals(2, stateTable.size()); + + Assert.assertNull(stateTable.putAndGetOld(1, 2, state_1_2)); + Assert.assertEquals(state_1_2, stateTable.get(1, 2)); + Assert.assertEquals(3, stateTable.size()); + + Assert.assertTrue(stateTable.containsKey(2, 1)); + Assert.assertFalse(stateTable.containsKey(3, 1)); + Assert.assertFalse(stateTable.containsKey(2, 3)); + stateTable.put(2, 1, null); + Assert.assertTrue(stateTable.containsKey(2, 1)); + Assert.assertEquals(3, stateTable.size()); + Assert.assertNull(stateTable.get(2, 1)); + stateTable.put(2, 1, state_2_1); + Assert.assertEquals(3, stateTable.size()); + + Assert.assertEquals(state_2_1, stateTable.removeAndGetOld(2, 1)); + Assert.assertFalse(stateTable.containsKey(2, 1)); + Assert.assertEquals(2, stateTable.size()); + + stateTable.remove(1, 2); + Assert.assertFalse(stateTable.containsKey(1, 2)); + Assert.assertEquals(1, stateTable.size()); + + Assert.assertNull(stateTable.removeAndGetOld(4, 2)); + Assert.assertEquals(1, stateTable.size()); + + StateTransformationFunction<ArrayList<Integer>, Integer> function = + new StateTransformationFunction<ArrayList<Integer>, Integer>() { + @Override + public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception { + previousState.add(value); + return previousState; + } + }; + + final int value = 4711; + stateTable.transform(1, 1, value, function); + state_1_1 = function.apply(state_1_1, value); + Assert.assertEquals(state_1_1, stateTable.get(1, 1)); + } + + /** + * This test triggers incremental rehash and tests for corruptions. + */ + @Test + public void testIncrementalRehash() { + RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = + new RegisteredBackendStateMetaInfo<>( + StateDescriptor.Type.UNKNOWN, + "test", + IntSerializer.INSTANCE, + new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects. + + final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE); + + final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable = + new CopyOnWriteStateTable<>(keyContext, metaInfo); + + int insert = 0; + int remove = 0; + while (!stateTable.isRehashing()) { + stateTable.put(insert++, 0, new ArrayList<Integer>()); + if (insert % 8 == 0) { + stateTable.remove(remove++, 0); + } + } + Assert.assertEquals(insert - remove, stateTable.size()); + while (stateTable.isRehashing()) { + stateTable.put(insert++, 0, new ArrayList<Integer>()); + if (insert % 8 == 0) { + stateTable.remove(remove++, 0); + } + } + Assert.assertEquals(insert - remove, stateTable.size()); + + for (int i = 0; i < insert; ++i) { + if (i < remove) { + Assert.assertFalse(stateTable.containsKey(i, 0)); + } else { + Assert.assertTrue(stateTable.containsKey(i, 0)); + } + } + } + + /** + * This test does some random modifications to a state table and a reference (hash map). Then draws snapshots, + * performs more modifications and checks snapshot integrity. + */ + @Test + public void testRandomModificationsAndCopyOnWriteIsolation() throws Exception { + + final RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = + new RegisteredBackendStateMetaInfo<>( + StateDescriptor.Type.UNKNOWN, + "test", + IntSerializer.INSTANCE, + new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects. + + final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE); + + final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable = + new CopyOnWriteStateTable<>(keyContext, metaInfo); + + final HashMap<Tuple2<Integer, Integer>, ArrayList<Integer>> referenceMap = new HashMap<>(); + + final Random random = new Random(42); + + // holds snapshots from the map under test + CopyOnWriteStateTable.StateTableEntry<Integer, Integer, ArrayList<Integer>>[] snapshot = null; + int snapshotSize = 0; + + // holds a reference snapshot from our reference map that we compare against + Tuple3<Integer, Integer, ArrayList<Integer>>[] reference = null; + + int val = 0; + + + int snapshotCounter = 0; + int referencedSnapshotId = 0; + + final StateTransformationFunction<ArrayList<Integer>, Integer> transformationFunction = + new StateTransformationFunction<ArrayList<Integer>, Integer>() { + @Override + public ArrayList<Integer> apply(ArrayList<Integer> previousState, Integer value) throws Exception { + if (previousState == null) { + previousState = new ArrayList<>(); + } + previousState.add(value); + // we give back the original, attempting to spot errors in to copy-on-write + return previousState; + } + }; + + // the main loop for modifications + for (int i = 0; i < 10_000_000; ++i) { + + int key = random.nextInt(20); + int namespace = random.nextInt(4); + Tuple2<Integer, Integer> compositeKey = new Tuple2<>(key, namespace); + + int op = random.nextInt(7); + + ArrayList<Integer> state = null; + ArrayList<Integer> referenceState = null; + + switch (op) { + case 0: + case 1: { + state = stateTable.get(key, namespace); + referenceState = referenceMap.get(compositeKey); + if (null == state) { + state = new ArrayList<>(); + stateTable.put(key, namespace, state); + referenceState = new ArrayList<>(); + referenceMap.put(compositeKey, referenceState); + } + break; + } + case 2: { + stateTable.put(key, namespace, new ArrayList<Integer>()); + referenceMap.put(compositeKey, new ArrayList<Integer>()); + break; + } + case 3: { + state = stateTable.putAndGetOld(key, namespace, new ArrayList<Integer>()); + referenceState = referenceMap.put(compositeKey, new ArrayList<Integer>()); + break; + } + case 4: { + stateTable.remove(key, namespace); + referenceMap.remove(compositeKey); + break; + } + case 5: { + state = stateTable.removeAndGetOld(key, namespace); + referenceState = referenceMap.remove(compositeKey); + break; + } + case 6: { + final int updateValue = random.nextInt(1000); + stateTable.transform(key, namespace, updateValue, transformationFunction); + referenceMap.put(compositeKey, transformationFunction.apply( + referenceMap.remove(compositeKey), updateValue)); + break; + } + default: { + Assert.fail("Unknown op-code " + op); + } + } + + Assert.assertEquals(referenceMap.size(), stateTable.size()); + + if (state != null) { + // mutate the states a bit... + if (random.nextBoolean() && !state.isEmpty()) { + state.remove(state.size() - 1); + referenceState.remove(referenceState.size() - 1); + } else { + state.add(val); + referenceState.add(val); + ++val; + } + } + + Assert.assertEquals(referenceState, state); + + // snapshot triggering / comparison / release + if (i > 0 && i % 500 == 0) { + + if (snapshot != null) { + // check our referenced snapshot + deepCheck(reference, convert(snapshot, snapshotSize)); + + if (i % 1_000 == 0) { + // draw and release some other snapshot while holding on the old snapshot + ++snapshotCounter; + stateTable.snapshotTableArrays(); + stateTable.releaseSnapshot(snapshotCounter); + } + + //release the snapshot after some time + if (i % 5_000 == 0) { + snapshot = null; + reference = null; + snapshotSize = 0; + stateTable.releaseSnapshot(referencedSnapshotId); + } + + } else { + // if there is no more referenced snapshot, we create one + ++snapshotCounter; + referencedSnapshotId = snapshotCounter; + snapshot = stateTable.snapshotTableArrays(); + snapshotSize = stateTable.size(); + reference = manualDeepDump(referenceMap); + } + } + } + } + + /** + * This tests for the copy-on-write contracts, e.g. ensures that no copy-on-write is active after all snapshots are + * released. + */ + @Test + public void testCopyOnWriteContracts() { + RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = + new RegisteredBackendStateMetaInfo<>( + StateDescriptor.Type.UNKNOWN, + "test", + IntSerializer.INSTANCE, + new ArrayListSerializer<>(IntSerializer.INSTANCE)); // we use mutable state objects. + + final MockInternalKeyContext<Integer> keyContext = new MockInternalKeyContext<>(IntSerializer.INSTANCE); + + final CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> stateTable = + new CopyOnWriteStateTable<>(keyContext, metaInfo); + + ArrayList<Integer> originalState1 = new ArrayList<>(1); + ArrayList<Integer> originalState2 = new ArrayList<>(1); + ArrayList<Integer> originalState3 = new ArrayList<>(1); + ArrayList<Integer> originalState4 = new ArrayList<>(1); + ArrayList<Integer> originalState5 = new ArrayList<>(1); + + originalState1.add(1); + originalState2.add(2); + originalState3.add(3); + originalState4.add(4); + originalState5.add(5); + + stateTable.put(1, 1, originalState1); + stateTable.put(2, 1, originalState2); + stateTable.put(4, 1, originalState4); + stateTable.put(5, 1, originalState5); + + // no snapshot taken, we get the original back + Assert.assertTrue(stateTable.get(1, 1) == originalState1); + CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot1 = stateTable.createSnapshot(); + // after snapshot1 is taken, we get a copy... + final ArrayList<Integer> copyState = stateTable.get(1, 1); + Assert.assertFalse(copyState == originalState1); + // ...and the copy is equal + Assert.assertEquals(originalState1, copyState); + + // we make an insert AFTER snapshot1 + stateTable.put(3, 1, originalState3); + + // on repeated lookups, we get the same copy because no further snapshot was taken + Assert.assertTrue(copyState == stateTable.get(1, 1)); + + // we take snapshot2 + CopyOnWriteStateTableSnapshot<Integer, Integer, ArrayList<Integer>> snapshot2 = stateTable.createSnapshot(); + // after the second snapshot, copy-on-write is active again for old entries + Assert.assertFalse(copyState == stateTable.get(1, 1)); + // and equality still holds + Assert.assertEquals(copyState, stateTable.get(1, 1)); + + // after releasing snapshot2 + stateTable.releaseSnapshot(snapshot2); + // we still get the original of the untouched late insert (after snapshot1) + Assert.assertTrue(originalState3 == stateTable.get(3, 1)); + // but copy-on-write is still active for older inserts (before snapshot1) + Assert.assertFalse(originalState4 == stateTable.get(4, 1)); + + // after releasing snapshot1 + stateTable.releaseSnapshot(snapshot1); + // no copy-on-write is active + Assert.assertTrue(originalState5 == stateTable.get(5, 1)); + } + + @SuppressWarnings("unchecked") + private static <K, N, S> Tuple3<K, N, S>[] convert(CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshot, int mapSize) { + + Tuple3<K, N, S>[] result = new Tuple3[mapSize]; + int pos = 0; + for (CopyOnWriteStateTable.StateTableEntry<K, N, S> entry : snapshot) { + while (null != entry) { + result[pos++] = new Tuple3<>(entry.getKey(), entry.getNamespace(), entry.getState()); + entry = entry.next; + } + } + Assert.assertEquals(mapSize, pos); + return result; + } + + @SuppressWarnings("unchecked") + private Tuple3<Integer, Integer, ArrayList<Integer>>[] manualDeepDump( + HashMap<Tuple2<Integer, Integer>, + ArrayList<Integer>> map) { + + Tuple3<Integer, Integer, ArrayList<Integer>>[] result = new Tuple3[map.size()]; + int pos = 0; + for (Map.Entry<Tuple2<Integer, Integer>, ArrayList<Integer>> entry : map.entrySet()) { + Integer key = entry.getKey().f0; + Integer namespace = entry.getKey().f1; + result[pos++] = new Tuple3<>(key, namespace, new ArrayList<>(entry.getValue())); + } + return result; + } + + private void deepCheck( + Tuple3<Integer, Integer, ArrayList<Integer>>[] a, + Tuple3<Integer, Integer, ArrayList<Integer>>[] b) { + + if (a == b) { + return; + } + + Assert.assertEquals(a.length, b.length); + + Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>> comparator = + new Comparator<Tuple3<Integer, Integer, ArrayList<Integer>>>() { + + @Override + public int compare(Tuple3<Integer, Integer, ArrayList<Integer>> o1, Tuple3<Integer, Integer, ArrayList<Integer>> o2) { + int namespaceDiff = o1.f1 - o2.f1; + return namespaceDiff != 0 ? namespaceDiff : o1.f0 - o2.f0; + } + }; + + Arrays.sort(a, comparator); + Arrays.sort(b, comparator); + + for (int i = 0; i < a.length; ++i) { + Tuple3<Integer, Integer, ArrayList<Integer>> av = a[i]; + Tuple3<Integer, Integer, ArrayList<Integer>> bv = b[i]; + + Assert.assertEquals(av.f0, bv.f0); + Assert.assertEquals(av.f1, bv.f1); + Assert.assertEquals(av.f2, bv.f2); + } + } + + static class MockInternalKeyContext<T> implements InternalKeyContext<T> { + + private T key; + private final TypeSerializer<T> serializer; + private final KeyGroupRange keyGroupRange; + + public MockInternalKeyContext(TypeSerializer<T> serializer) { + this.serializer = serializer; + this.keyGroupRange = new KeyGroupRange(0, 0); + } + + public void setKey(T key) { + this.key = key; + } + + @Override + public T getCurrentKey() { + return key; + } + + @Override + public int getCurrentKeyGroupIndex() { + return 0; + } + + @Override + public int getNumberOfKeyGroups() { + return 1; + } + + @Override + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } + + @Override + public TypeSerializer<T> getKeySerializer() { + return serializer; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java index 735b5f5..cb4e403 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java @@ -23,25 +23,20 @@ import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.internal.InternalAggregatingState; - import org.junit.Test; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; /** * Tests for the simple Java heap objects implementation of the {@link AggregatingState}. */ -public class HeapAggregatingStateTest { +public class HeapAggregatingStateTest extends HeapStateBackendTestBase { @Test public void testAddAndGet() throws Exception { @@ -227,20 +222,6 @@ public class HeapAggregatingStateTest { } // ------------------------------------------------------------------------ - // utilities - // ------------------------------------------------------------------------ - - private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception { - return new HeapKeyedStateBackend<>( - mock(TaskKvStateRegistry.class), - StringSerializer.INSTANCE, - HeapAggregatingStateTest.class.getClassLoader(), - 16, - new KeyGroupRange(0, 15), - new ExecutionConfig()); - } - - // ------------------------------------------------------------------------ // test functions // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java new file mode 100644 index 0000000..da0666a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.internal.InternalListState; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.net.URL; +import java.util.Collections; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; + +/** + * Tests backwards compatibility in the serialization format of heap-based KeyedStateBackends. + */ +public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackendTestBase { + + /** + * [FLINK-5979] + * + * This test takes a snapshot that was created with Flink 1.2 and tries to restore it in master to check + * the backwards compatibility of the serialization format of {@link StateTable}s. + */ + @Test + public void testRestore1_2ToMaster() throws Exception { + + ClassLoader cl = getClass().getClassLoader(); + URL resource = cl.getResource("heap_keyed_statebackend_1_2.snapshot"); + + Preconditions.checkNotNull(resource, "Binary snapshot resource not found!"); + + final Integer namespace1 = 1; + final Integer namespace2 = 2; + final Integer namespace3 = 3; + + try (final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend()) { + final KeyGroupsStateHandle stateHandle; + try (BufferedInputStream bis = new BufferedInputStream((new FileInputStream(resource.getFile())))) { + stateHandle = InstantiationUtil.deserializeObject(bis, Thread.currentThread().getContextClassLoader()); + } + keyedBackend.restore(Collections.singleton(stateHandle)); + final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class); + stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); + + InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr); + + assertEquals(7, keyedBackend.numStateEntries()); + + keyedBackend.setCurrentKey("abc"); + state.setCurrentNamespace(namespace1); + assertEquals(asList(33L, 55L), state.get()); + state.setCurrentNamespace(namespace2); + assertEquals(asList(22L, 11L), state.get()); + state.setCurrentNamespace(namespace3); + assertEquals(Collections.singletonList(44L), state.get()); + + keyedBackend.setCurrentKey("def"); + state.setCurrentNamespace(namespace1); + assertEquals(asList(11L, 44L), state.get()); + + state.setCurrentNamespace(namespace3); + assertEquals(asList(22L, 55L, 33L), state.get()); + + keyedBackend.setCurrentKey("jkl"); + state.setCurrentNamespace(namespace1); + assertEquals(asList(11L, 22L, 33L, 44L, 55L), state.get()); + + keyedBackend.setCurrentKey("mno"); + state.setCurrentNamespace(namespace3); + assertEquals(asList(11L, 22L, 33L, 44L, 55L), state.get()); + } + } + +// /** +// * This code was used to create the binary file of the old version's snapshot used by this test. If you need to +// * recreate the binary, you can comment this out and run it. +// */ +// private void createBinarySnapshot() throws Exception { +// +// final String pathToWrite = "/PATH/TO/WRITE"; +// +// final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class); +// stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); +// +// final Integer namespace1 = 1; +// final Integer namespace2 = 2; +// final Integer namespace3 = 3; +// +// final HeapKeyedStateBackend<String> keyedBackend = createKeyedBackend(); +// +// try { +// InternalListState<Integer, Long> state = keyedBackend.createListState(IntSerializer.INSTANCE, stateDescr); +// +// keyedBackend.setCurrentKey("abc"); +// state.setCurrentNamespace(namespace1); +// state.add(33L); +// state.add(55L); +// +// state.setCurrentNamespace(namespace2); +// state.add(22L); +// state.add(11L); +// +// state.setCurrentNamespace(namespace3); +// state.add(44L); +// +// keyedBackend.setCurrentKey("def"); +// state.setCurrentNamespace(namespace1); +// state.add(11L); +// state.add(44L); +// +// state.setCurrentNamespace(namespace3); +// state.add(22L); +// state.add(55L); +// state.add(33L); +// +// keyedBackend.setCurrentKey("jkl"); +// state.setCurrentNamespace(namespace1); +// state.add(11L); +// state.add(22L); +// state.add(33L); +// state.add(44L); +// state.add(55L); +// +// keyedBackend.setCurrentKey("mno"); +// state.setCurrentNamespace(namespace3); +// state.add(11L); +// state.add(22L); +// state.add(33L); +// state.add(44L); +// state.add(55L); +// RunnableFuture<KeyGroupsStateHandle> snapshot = keyedBackend.snapshot( +// 0L, +// 0L, +// new MemCheckpointStreamFactory(4 * 1024 * 1024), +// CheckpointOptions.forFullCheckpoint()); +// +// snapshot.run(); +// +// try (BufferedOutputStream bis = new BufferedOutputStream(new FileOutputStream(pathToWrite))) { +// InstantiationUtil.serializeObject(bis, snapshot.get()); +// } +// +// } finally { +// keyedBackend.close(); +// keyedBackend.dispose(); +// } +// } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java index c36a48b..7705c19 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java @@ -22,13 +22,9 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.internal.InternalListState; - import org.junit.Test; import java.util.ArrayList; @@ -39,12 +35,11 @@ import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; /** * Tests for the simple Java heap objects implementation of the {@link ListState}. */ -public class HeapListStateTest { +public class HeapListStateTest extends HeapStateBackendTestBase { @Test public void testAddAndGet() throws Exception { @@ -225,16 +220,6 @@ public class HeapListStateTest { keyedBackend.dispose(); } } - - private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception { - return new HeapKeyedStateBackend<>( - mock(TaskKvStateRegistry.class), - StringSerializer.INSTANCE, - HeapListStateTest.class.getClassLoader(), - 16, - new KeyGroupRange(0, 15), - new ExecutionConfig()); - } private static <T> void validateResult(Iterable<T> values, Set<T> expected) { int num = 0; http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java index 63eec04..928eaec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java @@ -23,25 +23,20 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.internal.InternalReducingState; - import org.junit.Test; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; /** * Tests for the simple Java heap objects implementation of the {@link ReducingState}. */ -public class HeapReducingStateTest { +public class HeapReducingStateTest extends HeapStateBackendTestBase { @Test public void testAddAndGet() throws Exception { @@ -214,7 +209,7 @@ public class HeapReducingStateTest { keyedBackend.setCurrentKey("mno"); state.setCurrentNamespace(namespace1); state.clear(); - + StateTable<String, Integer, Long> stateTable = ((HeapReducingState<String, Integer, Long>) state).stateTable; @@ -227,20 +222,6 @@ public class HeapReducingStateTest { } // ------------------------------------------------------------------------ - // utilities - // ------------------------------------------------------------------------ - - private static HeapKeyedStateBackend<String> createKeyedBackend() throws Exception { - return new HeapKeyedStateBackend<>( - mock(TaskKvStateRegistry.class), - StringSerializer.INSTANCE, - HeapReducingStateTest.class.getClassLoader(), - 16, - new KeyGroupRange(0, 15), - new ExecutionConfig()); - } - - // ------------------------------------------------------------------------ // test functions // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java new file mode 100644 index 0000000..e6adef8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; + +import static org.mockito.Mockito.mock; + +@RunWith(Parameterized.class) +public abstract class HeapStateBackendTestBase { + + @Parameterized.Parameters + public static Collection<Boolean> parameters() { + return Arrays.asList(false, true); + } + + @Parameterized.Parameter + public boolean async; + + public HeapKeyedStateBackend<String> createKeyedBackend() throws Exception { + return new HeapKeyedStateBackend<>( + mock(TaskKvStateRegistry.class), + StringSerializer.INSTANCE, + HeapReducingStateTest.class.getClassLoader(), + 16, + new KeyGroupRange(0, 15), + async, + new ExecutionConfig()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java new file mode 100644 index 0000000..6fd94f7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/StateTableSnapshotCompatibilityTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.ArrayListSerializer; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; +import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Random; + +public class StateTableSnapshotCompatibilityTest { + + /** + * This test ensures that different implementations of {@link StateTable} are compatible in their serialization + * format. + */ + @Test + public void checkCompatibleSerializationFormats() throws IOException { + final Random r = new Random(42); + RegisteredBackendStateMetaInfo<Integer, ArrayList<Integer>> metaInfo = + new RegisteredBackendStateMetaInfo<>( + StateDescriptor.Type.UNKNOWN, + "test", + IntSerializer.INSTANCE, + new ArrayListSerializer<>(IntSerializer.INSTANCE)); + + final CopyOnWriteStateTableTest.MockInternalKeyContext<Integer> keyContext = + new CopyOnWriteStateTableTest.MockInternalKeyContext<>(IntSerializer.INSTANCE); + + CopyOnWriteStateTable<Integer, Integer, ArrayList<Integer>> cowStateTable = + new CopyOnWriteStateTable<>(keyContext, metaInfo); + + for (int i = 0; i < 100; ++i) { + ArrayList<Integer> list = new ArrayList<>(5); + int end = r.nextInt(5); + for (int j = 0; j < end; ++j) { + list.add(r.nextInt(100)); + } + + cowStateTable.put(r.nextInt(10), r.nextInt(2), list); + } + + StateTableSnapshot snapshot = cowStateTable.createSnapshot(); + + final NestedMapsStateTable<Integer, Integer, ArrayList<Integer>> nestedMapsStateTable = + new NestedMapsStateTable<>(keyContext, metaInfo); + + restoreStateTableFromSnapshot(nestedMapsStateTable, snapshot, keyContext.getKeyGroupRange()); + snapshot.release(); + + + Assert.assertEquals(cowStateTable.size(), nestedMapsStateTable.size()); + for (StateEntry<Integer, Integer, ArrayList<Integer>> entry : cowStateTable) { + Assert.assertEquals(entry.getState(), nestedMapsStateTable.get(entry.getKey(), entry.getNamespace())); + } + + snapshot = nestedMapsStateTable.createSnapshot(); + cowStateTable = new CopyOnWriteStateTable<>(keyContext, metaInfo); + + restoreStateTableFromSnapshot(cowStateTable, snapshot, keyContext.getKeyGroupRange()); + snapshot.release(); + + Assert.assertEquals(nestedMapsStateTable.size(), cowStateTable.size()); + for (StateEntry<Integer, Integer, ArrayList<Integer>> entry : cowStateTable) { + Assert.assertEquals(nestedMapsStateTable.get(entry.getKey(), entry.getNamespace()), entry.getState()); + } + } + + private static <K, N, S> void restoreStateTableFromSnapshot( + StateTable<K, N, S> stateTable, + StateTableSnapshot snapshot, + KeyGroupRange keyGroupRange) throws IOException { + + final ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos(1024 * 1024); + final DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper(out); + + for (Integer keyGroup : keyGroupRange) { + snapshot.writeMappingsInKeyGroup(dov, keyGroup); + } + + final ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(out.getBuf()); + final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in); + + final StateTableByKeyGroupReader keyGroupReader = + StateTableByKeyGroupReaders.readerForVersion(stateTable, KeyedBackendSerializationProxy.VERSION); + + for (Integer keyGroup : keyGroupRange) { + keyGroupReader.readMappingsInKeyGroup(div, keyGroup); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java new file mode 100644 index 0000000..291f3ed --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; + +import java.io.IOException; + +/** + * {@link CheckpointStreamFactory} for tests that allows for testing cancellation in async IO + */ +@VisibleForTesting +@Internal +public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { + + private final int maxSize; + private volatile int afterNumberInvocations; + private volatile OneShotLatch blocker; + private volatile OneShotLatch waiter; + + MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream; + + public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() { + return lastCreatedStream; + } + + public BlockerCheckpointStreamFactory(int maxSize) { + this.maxSize = maxSize; + } + + public void setAfterNumberInvocations(int afterNumberInvocations) { + this.afterNumberInvocations = afterNumberInvocations; + } + + public void setBlockerLatch(OneShotLatch latch) { + this.blocker = latch; + } + + public void setWaiterLatch(OneShotLatch latch) { + this.waiter = latch; + } + + @Override + public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { + this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) { + + private int afterNInvocations = afterNumberInvocations; + private final OneShotLatch streamBlocker = blocker; + private final OneShotLatch streamWaiter = waiter; + + @Override + public void write(int b) throws IOException { + + if (null != waiter) { + waiter.trigger(); + } + + if (afterNInvocations > 0) { + --afterNInvocations; + } + + if (0 == afterNInvocations && null != streamBlocker) { + try { + streamBlocker.await(); + } catch (InterruptedException ignored) { + } + } + try { + super.write(b); + } catch (IOException ex) { + if (null != streamWaiter) { + streamWaiter.trigger(); + } + throw ex; + } + + if (0 == afterNInvocations && null != streamWaiter) { + streamWaiter.trigger(); + } + } + + @Override + public void close() { + super.close(); + if (null != streamWaiter) { + streamWaiter.trigger(); + } + } + }; + + return lastCreatedStream; + } + + @Override + public void close() throws Exception { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot new file mode 100644 index 0000000..b9171bc Binary files /dev/null and b/flink-runtime/src/test/resources/heap_keyed_statebackend_1_2.snapshot differ http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java index 05c89dd..781c320 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; +import org.apache.flink.util.MathUtils; import java.io.IOException; import java.util.ArrayList; @@ -98,9 +99,7 @@ public class TimeWindow extends Window { @Override public int hashCode() { - int result = (int) (start ^ (start >>> 32)); - result = 31 * result + (int) (end ^ (end >>> 32)); - return result; + return MathUtils.longToIntWithBitMixing(start + end); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 1911f44..5e966d1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -91,7 +91,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } enum StateBackendEnum { - MEM, FILE, ROCKSDB_FULLY_ASYNC + MEM, FILE, ROCKSDB_FULLY_ASYNC, MEM_ASYNC, FILE_ASYNC } @BeforeClass @@ -116,11 +116,19 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog public void initStateBackend() throws IOException { switch (stateBackendEnum) { case MEM: - this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE); + this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false); break; case FILE: { String backups = tempFolder.newFolder().getAbsolutePath(); - this.stateBackend = new FsStateBackend("file://" + backups); + this.stateBackend = new FsStateBackend("file://" + backups, false); + break; + } + case MEM_ASYNC: + this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, true); + break; + case FILE_ASYNC: { + String backups = tempFolder.newFolder().getAbsolutePath(); + this.stateBackend = new FsStateBackend("file://" + backups, true); break; } case ROCKSDB_FULLY_ASYNC: { @@ -138,9 +146,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog @Test public void testTumblingTimeWindow() { - final int NUM_ELEMENTS_PER_KEY = 3000; - final int WINDOW_SIZE = 100; - final int NUM_KEYS = 100; + final int NUM_ELEMENTS_PER_KEY = numElementsPerKey(); + final int WINDOW_SIZE = windowSize(); + final int NUM_KEYS = numKeys(); FailingSource.reset(); try { @@ -211,9 +219,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } public void doTestTumblingTimeWindowWithKVState(int maxParallelism) { - final int NUM_ELEMENTS_PER_KEY = 3000; - final int WINDOW_SIZE = 100; - final int NUM_KEYS = 100; + final int NUM_ELEMENTS_PER_KEY = numElementsPerKey(); + final int WINDOW_SIZE = windowSize(); + final int NUM_KEYS = numKeys(); FailingSource.reset(); try { @@ -280,10 +288,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog @Test public void testSlidingTimeWindow() { - final int NUM_ELEMENTS_PER_KEY = 3000; - final int WINDOW_SIZE = 1000; - final int WINDOW_SLIDE = 100; - final int NUM_KEYS = 100; + final int NUM_ELEMENTS_PER_KEY = numElementsPerKey(); + final int WINDOW_SIZE = windowSize(); + final int WINDOW_SLIDE = windowSlide(); + final int NUM_KEYS = numKeys(); FailingSource.reset(); try { @@ -346,9 +354,9 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog @Test public void testPreAggregatedTumblingTimeWindow() { - final int NUM_ELEMENTS_PER_KEY = 3000; - final int WINDOW_SIZE = 100; - final int NUM_KEYS = 100; + final int NUM_ELEMENTS_PER_KEY = numElementsPerKey(); + final int WINDOW_SIZE = windowSize(); + final int NUM_KEYS = numKeys(); FailingSource.reset(); try { @@ -418,10 +426,10 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog @Test public void testPreAggregatedSlidingTimeWindow() { - final int NUM_ELEMENTS_PER_KEY = 3000; - final int WINDOW_SIZE = 1000; - final int WINDOW_SLIDE = 100; - final int NUM_KEYS = 100; + final int NUM_ELEMENTS_PER_KEY = numElementsPerKey(); + final int WINDOW_SIZE = windowSize(); + final int WINDOW_SLIDE = windowSlide(); + final int NUM_KEYS = numKeys(); FailingSource.reset(); try { @@ -790,4 +798,20 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog public IntType(int value) { this.value = value; } } + + protected int numElementsPerKey() { + return 300; + } + + protected int windowSize() { + return 100; + } + + protected int windowSlide() { + return 100; + } + + protected int numKeys() { + return 20; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java new file mode 100644 index 0000000..a5bf10c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { + + public AsyncFileBackendEventTimeWindowCheckpointingITCase() { + super(StateBackendEnum.FILE_ASYNC); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java new file mode 100644 index 0000000..ef9ad37 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { + + public AsyncMemBackendEventTimeWindowCheckpointingITCase() { + super(StateBackendEnum.MEM_ASYNC); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java index 14feb78..da2bbc7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java @@ -23,4 +23,24 @@ public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEv public RocksDbBackendEventTimeWindowCheckpointingITCase() { super(StateBackendEnum.ROCKSDB_FULLY_ASYNC); } + + @Override + protected int numElementsPerKey() { + return 3000; + } + + @Override + protected int windowSize() { + return 1000; + } + + @Override + protected int windowSlide() { + return 100; + } + + @Override + protected int numKeys() { + return 100; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ab014ef9/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java index 456861a..cbb56d0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java @@ -33,6 +33,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.util.Random; + /** * A collection of manual tests that serve to assess the performance of windowed operations. These * run in local mode with parallelism 1 with a source that emits data as fast as possible. Thus, @@ -241,11 +243,10 @@ public class ManualWindowSpeedITCase extends StreamingMultipleProgramsTestBase { @Override public void run(SourceContext<Tuple2<String, Integer>> out) throws Exception { - long index = 0; + Random random = new Random(42); while (running) { - Tuple2<String, Integer> tuple = new Tuple2<String, Integer>("Tuple " + (index % numKeys), 1); + Tuple2<String, Integer> tuple = new Tuple2<String, Integer>("Tuple " + (random.nextInt(numKeys)), 1); out.collect(tuple); - index++; } }
