http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java new file mode 100644 index 0000000..2a7947d --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of {@link Serde} which serializes and deserializes lists. + */ [email protected] +public class SerdeListSlice<T> implements Serde<List<T>, Slice> +{ + @NotNull + private Serde<T, Slice> serde; + + private SerdeListSlice() + { + // for Kryo + } + + /** + * Creates a {@link SerdeListSlice}. + * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list. + */ + public SerdeListSlice(@NotNull Serde<T, Slice> serde) + { + this.serde = Preconditions.checkNotNull(serde); + } + + @Override + public Slice serialize(List<T> objects) + { + Slice[] slices = new Slice[objects.size()]; + + int size = 4; + + for (int index = 0; index < objects.size(); index++) { + Slice slice = serde.serialize(objects.get(index)); + slices[index] = slice; + size += slice.length; + } + + byte[] bytes = new byte[size]; + int offset = 0; + + byte[] sizeBytes = GPOUtils.serializeInt(objects.size()); + System.arraycopy(sizeBytes, 0, bytes, offset, 4); + offset += 4; + + for (int index = 0; index < slices.length; index++) { + Slice slice = slices[index]; + System.arraycopy(slice.buffer, slice.offset, bytes, offset, slice.length); + offset += slice.length; + } + + return new Slice(bytes); + } + + @Override + public List<T> deserialize(Slice slice, MutableInt offset) + { + MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue()); + + int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset); + List<T> list = Lists.newArrayListWithCapacity(numElements); + sliceOffset.subtract(slice.offset); + + for (int index = 0; index < numElements; index++) { + T object = serde.deserialize(slice, sliceOffset); + list.add(object); + } + + offset.setValue(sliceOffset.intValue()); + return list; + } + + @Override + public List<T> deserialize(Slice slice) + { + return deserialize(slice, new MutableInt(0)); + } +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java new file mode 100644 index 0000000..80ee597 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.netlet.util.Slice; + +/** + * An implementation of {@link Serde} which serializes and deserializes {@link String}s. + */ [email protected] +public class SerdeStringSlice implements Serde<String, Slice> +{ + @Override + public Slice serialize(String object) + { + return new Slice(GPOUtils.serializeString(object)); + } + + @Override + public String deserialize(Slice object, MutableInt offset) + { + offset.add(object.offset); + String string = GPOUtils.deserializeString(object.buffer, offset); + offset.subtract(object.offset); + return string; + } + + @Override + public String deserialize(Slice object) + { + return deserialize(object, new MutableInt(0)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java new file mode 100644 index 0000000..b6a61f4 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.netlet.util.Slice; + +/** + * A utility class which contains static methods for manipulating byte arrays and {@link Slice}s + */ [email protected] +public class SliceUtils +{ + private SliceUtils() + { + } + + /** + * Concatenates two byte arrays. + * @param a The first byte array to concatenate. + * @param b The second byte array to concatenate. + * @return The concatenated byte arrays. + */ + public static byte[] concatenate(byte[] a, byte[] b) + { + byte[] output = new byte[a.length + b.length]; + + System.arraycopy(a, 0, output, 0, a.length); + System.arraycopy(b, 0, output, a.length, b.length); + return output; + } + + /** + * Concatenates two {@link Slice}s + * @param a The first {@link Slice} to concatenate. + * @param b The second {@link Slice} to concatenate. + * @return The concatenated {@link Slice}. + */ + public static Slice concatenate(Slice a, Slice b) + { + int size = a.length + b.length; + byte[] bytes = new byte[size]; + + System.arraycopy(a.buffer, a.offset, bytes, 0, a.length); + System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length); + + return new Slice(bytes); + } + + /** + * Concatenates a byte array with the contents of a {@link Slice}. + * @param a The byte array to concatenate. The contents of the byte array appear first in the concatenation. + * @param b The {@link Slice} to concatenate a byte array with. + * @return A {@link Slice} whose contents are the concatenation of the input byte array and {@link Slice}. + */ + public static Slice concatenate(byte[] a, Slice b) + { + int size = a.length + b.length; + byte[] bytes = new byte[size]; + + System.arraycopy(a, 0, bytes, 0, a.length); + System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length); + + return new Slice(bytes); + } + + /** + * Concatenates a byte array with the contents of a {@link Slice}. + * @param a The byte array to concatenate. + * @param b The {@link Slice} to concatenate a byte array with. The contents of the {@link Slice} appear first in the + * concatenation. + * @return A {@link Slice} whose contents are the concatenation of the input byte array and {@link Slice}. + */ + public static Slice concatenate(Slice a, byte[] b) + { + int size = a.length + b.length; + byte[] bytes = new byte[size]; + + System.arraycopy(a.buffer, a.offset, bytes, 0, a.length); + System.arraycopy(b, 0, bytes, a.length, b.length); + + return new Slice(bytes); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/com/datatorrent/lib/util/TestUtils.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java index f3b2140..673054b 100644 --- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java +++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java @@ -26,15 +26,40 @@ import org.junit.runner.Description; import org.apache.commons.io.FileUtils; +import com.google.common.base.Preconditions; + import com.datatorrent.api.DefaultPartition; import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.OutputPort; import com.datatorrent.api.Sink; import com.datatorrent.api.Stats; import com.datatorrent.api.StatsListener; +import com.datatorrent.netlet.util.Slice; public class TestUtils { + public static byte[] getByte(int val) + { + Preconditions.checkArgument(val <= Byte.MAX_VALUE); + return new byte[]{(byte)val}; + } + + public static byte[] getBytes(int val) + { + byte[] bytes = new byte[4]; + bytes[0] = (byte)(val & 0xFF); + bytes[1] = (byte)((val >> 8) & 0xFF); + bytes[2] = (byte)((val >> 16) & 0xFF); + bytes[3] = (byte)((val >> 24) & 0xFF); + + return bytes; + } + + public static Slice getSlice(int val) + { + return new Slice(getBytes(val)); + } + public static class TestInfo extends TestWatcher { public org.junit.runner.Description desc; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java index d3564ba..c57e0ca 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java @@ -44,7 +44,8 @@ import com.datatorrent.netlet.util.Slice; public class ManagedStateTestUtils { - static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue> unsavedBucket, + public static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue> + unsavedBucket, int keysPerTimeBucket) throws IOException { RemoteIterator<LocatedFileStatus> iterator = fileAccess.listFiles(bucketId); @@ -82,7 +83,7 @@ public class ManagedStateTestUtils Assert.assertEquals("data of bucket" + bucketId, testBucket, fromDisk); } - static Map<Long, Map<Slice, Bucket.BucketedValue>> getTestData(int startBucket, int endBucket, int keyStart) + public static Map<Long, Map<Slice, Bucket.BucketedValue>> getTestData(int startBucket, int endBucket, int keyStart) { Map<Long, Map<Slice, Bucket.BucketedValue>> data = Maps.newHashMap(); for (int i = startBucket; i < endBucket; i++) { @@ -92,7 +93,7 @@ public class ManagedStateTestUtils return data; } - static Map<Slice, Bucket.BucketedValue> getTestBucketData(int keyStart, long timeBucketStart) + public static Map<Slice, Bucket.BucketedValue> getTestBucketData(int keyStart, long timeBucketStart) { Map<Slice, Bucket.BucketedValue> bucketData = Maps.newHashMap(); for (int j = 0; j < 5; j++) { @@ -103,14 +104,14 @@ public class ManagedStateTestUtils return bucketData; } - static Context.OperatorContext getOperatorContext(int operatorId, String applicationPath) + public static Context.OperatorContext getOperatorContext(int operatorId, String applicationPath) { Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); attributes.put(DAG.APPLICATION_PATH, applicationPath); return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes); } - static Context.OperatorContext getOperatorContext(int operatorId) + public static Context.OperatorContext getOperatorContext(int operatorId) { Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes); @@ -118,7 +119,7 @@ public class ManagedStateTestUtils private static final transient Logger LOG = LoggerFactory.getLogger(ManagedStateTestUtils.class); - static Slice getSliceFor(String x) + public static Slice getSliceFor(String x) { return new Slice(x.getBytes()); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java deleted file mode 100644 index d73bd95..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.state.managed.spillable.inmem; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.apex.malhar.lib.state.spillable.inmem.InMemMultiset; - -import com.esotericsoftware.kryo.Kryo; - -import com.datatorrent.lib.util.KryoCloneUtils; - -public class InMemMultisetTest -{ - @Test - public void serializationTest() - { - InMemMultiset<String> set = new InMemMultiset<>(); - - set.add("a"); - set.add("a"); - - InMemMultiset<String> cloned = KryoCloneUtils.cloneObject(new Kryo(), set); - - Assert.assertEquals(2, cloned.count("a")); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java deleted file mode 100644 index 573982a..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.state.managed.spillable.inmem; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableArrayList; - -import com.esotericsoftware.kryo.Kryo; - -import com.datatorrent.lib.util.KryoCloneUtils; - -public class InMemSpillableArrayListTest -{ - @Test - public void serializationTest() - { - InMemSpillableArrayList<String> list = new InMemSpillableArrayList<>(); - - list.add("a"); - list.add("a"); - - InMemSpillableArrayList<String> cloned = KryoCloneUtils.cloneObject(new Kryo(), list); - - Assert.assertEquals(2, list.size()); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java deleted file mode 100644 index a6bf811..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.state.managed.spillable.inmem; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableByteArrayListMultimap; - -import com.esotericsoftware.kryo.Kryo; - -import com.datatorrent.lib.util.KryoCloneUtils; - -public class InMemSpillableByteArrayListMultimapTest -{ - @Test - public void serializationTest() - { - InMemSpillableByteArrayListMultimap<String, String> multimap = - new InMemSpillableByteArrayListMultimap<String, String>(); - - multimap.put("a", "b"); - multimap.put("a", "c"); - - InMemSpillableByteArrayListMultimap<String, String> cloned = KryoCloneUtils.cloneObject(new Kryo(), multimap); - - Assert.assertEquals(2, cloned.get("a").size()); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java new file mode 100644 index 0000000..f40ae87 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.lib.util.TestUtils; + +public class SequentialSpillableIdentifierGeneratorTest +{ + @Test + public void dontAllowRegistrationAfterNextCallTest() + { + SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator(); + + gen.next(); + + boolean exception = false; + + try { + gen.register(TestUtils.getByte(1)); + } catch (Exception e) { + exception = true; + } + + Assert.assertTrue(exception); + } + + @Test + public void simpleSequentialIdGenerationTest() + { + SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator(); + + for (int index = 0; index < (((int)Byte.MAX_VALUE) + 1); index++) { + byte[] id = gen.next(); + + checkId(index, id); + } + + boolean threwException = false; + + try { + gen.next(); + } catch (Exception e) { + threwException = true; + } + + Assert.assertTrue(threwException); + } + + @Test + public void registerFirst() + { + SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator(); + gen.register(TestUtils.getByte(0)); + + byte[] id = gen.next(); + + Assert.assertArrayEquals(TestUtils.getByte(1), id); + } + + @Test + public void registerLast() + { + SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator(); + gen.register(TestUtils.getByte(Byte.MAX_VALUE)); + + for (int index = 0; index <= (((int)Byte.MAX_VALUE) - 1); index++) { + byte[] id = gen.next(); + + checkId(index, id); + } + + boolean threwException = false; + + try { + gen.next(); + } catch (Exception e) { + threwException = true; + } + + Assert.assertTrue(threwException); + } + + @Test + public void intermingledRegistered() + { + SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator(); + + gen.register(TestUtils.getByte(1)); + gen.register(TestUtils.getByte(2)); + gen.register(TestUtils.getByte(5)); + gen.register(TestUtils.getByte(7)); + + checkId(0, gen.next()); + checkId(3, gen.next()); + checkId(4, gen.next()); + checkId(6, gen.next()); + checkId(8, gen.next()); + checkId(9, gen.next()); + } + + private void checkId(int val, byte[] id) + { + Assert.assertEquals(1, id.length); + Assert.assertEquals(val, id[0]); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java new file mode 100644 index 0000000..af05c88 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java @@ -0,0 +1,594 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.KryoCloneUtils; + +public class SpillableArrayListImplTest +{ + public static final byte[] ID1 = new byte[]{(byte)0}; + public static final byte[] ID2 = new byte[]{(byte)1}; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleAddGetAndSetTest1() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleAddGetAndSetTest1Helper(store); + } + + @Test + public void simpleAddGetAndSetManagedStateTest1() + { + simpleAddGetAndSetTest1Helper(testMeta.store); + } + + public void simpleAddGetAndSetTest1Helper(SpillableStateStore store) + { + SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store, + new SerdeStringSlice(), 1); + + store.setup(testMeta.operatorContext); + list.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + list.beginWindow(windowId); + + SpillableTestUtils.checkOutOfBounds(list, 0); + Assert.assertEquals(0, list.size()); + + list.add("a"); + + SpillableTestUtils.checkOutOfBounds(list, 1); + Assert.assertEquals(1, list.size()); + + Assert.assertEquals("a", list.get(0)); + + list.addAll(Lists.newArrayList("a", "b", "c")); + + Assert.assertEquals(4, list.size()); + + Assert.assertEquals("a", list.get(0)); + Assert.assertEquals("a", list.get(1)); + Assert.assertEquals("b", list.get(2)); + Assert.assertEquals("c", list.get(3)); + + SpillableTestUtils.checkOutOfBounds(list, 4); + + list.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + list.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a")); + SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("a")); + SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b")); + SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c")); + + Assert.assertEquals(4, list.size()); + + Assert.assertEquals("a", list.get(0)); + Assert.assertEquals("a", list.get(1)); + Assert.assertEquals("b", list.get(2)); + Assert.assertEquals("c", list.get(3)); + + list.add("tt"); + list.add("ab"); + list.add("99"); + list.add("oo"); + + Assert.assertEquals("tt", list.get(4)); + Assert.assertEquals("ab", list.get(5)); + Assert.assertEquals("99", list.get(6)); + Assert.assertEquals("oo", list.get(7)); + + list.set(1, "111"); + + Assert.assertEquals("a", list.get(0)); + Assert.assertEquals("111", list.get(1)); + Assert.assertEquals("b", list.get(2)); + Assert.assertEquals("c", list.get(3)); + Assert.assertEquals("tt", list.get(4)); + Assert.assertEquals("ab", list.get(5)); + Assert.assertEquals("99", list.get(6)); + Assert.assertEquals("oo", list.get(7)); + + list.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + list.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a")); + SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("111")); + SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b")); + SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c")); + SpillableTestUtils.checkValue(store, 0L, ID1, 4, Lists.newArrayList("tt")); + SpillableTestUtils.checkValue(store, 0L, ID1, 5, Lists.newArrayList("ab")); + SpillableTestUtils.checkValue(store, 0L, ID1, 6, Lists.newArrayList("99")); + SpillableTestUtils.checkValue(store, 0L, ID1, 7, Lists.newArrayList("oo")); + + list.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + list.teardown(); + store.teardown(); + } + + @Test + public void simpleAddGetAndSetTest3() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleAddGetAndSetTest3Helper(store); + } + + @Test + public void simpleAddGetAndSetManagedStateTest3() + { + simpleAddGetAndSetTest3Helper(testMeta.store); + } + + private void simpleAddGetAndSetTest3Helper(SpillableStateStore store) + { + SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store, + new SerdeStringSlice(), 3); + + store.setup(testMeta.operatorContext); + list.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + list.beginWindow(windowId); + + SpillableTestUtils.checkOutOfBounds(list, 0); + Assert.assertEquals(0, list.size()); + + list.add("a"); + + SpillableTestUtils.checkOutOfBounds(list, 1); + Assert.assertEquals(1, list.size()); + + Assert.assertEquals("a", list.get(0)); + + list.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g")); + + Assert.assertEquals(8, list.size()); + + Assert.assertEquals("a", list.get(0)); + Assert.assertEquals("a", list.get(1)); + Assert.assertEquals("b", list.get(2)); + Assert.assertEquals("c", list.get(3)); + Assert.assertEquals("d", list.get(4)); + Assert.assertEquals("e", list.get(5)); + Assert.assertEquals("f", list.get(6)); + Assert.assertEquals("g", list.get(7)); + + SpillableTestUtils.checkOutOfBounds(list, 20); + + list.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + list.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "a", "b")); + SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("c", "d", "e")); + SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g")); + + Assert.assertEquals(8, list.size()); + + Assert.assertEquals("a", list.get(0)); + Assert.assertEquals("a", list.get(1)); + Assert.assertEquals("b", list.get(2)); + Assert.assertEquals("c", list.get(3)); + Assert.assertEquals("d", list.get(4)); + Assert.assertEquals("e", list.get(5)); + Assert.assertEquals("f", list.get(6)); + Assert.assertEquals("g", list.get(7)); + + list.add("tt"); + list.add("ab"); + list.add("99"); + list.add("oo"); + + Assert.assertEquals("tt", list.get(8)); + Assert.assertEquals("ab", list.get(9)); + Assert.assertEquals("99", list.get(10)); + Assert.assertEquals("oo", list.get(11)); + + list.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + list.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "a", "b")); + SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("c", "d", "e")); + SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g", "tt")); + SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("ab", "99", "oo")); + + list.set(1, "111"); + list.set(3, "222"); + list.set(5, "333"); + list.set(11, "444"); + + Assert.assertEquals("a", list.get(0)); + Assert.assertEquals("111", list.get(1)); + Assert.assertEquals("b", list.get(2)); + Assert.assertEquals("222", list.get(3)); + Assert.assertEquals("d", list.get(4)); + Assert.assertEquals("333", list.get(5)); + Assert.assertEquals("f", list.get(6)); + Assert.assertEquals("g", list.get(7)); + Assert.assertEquals("tt", list.get(8)); + Assert.assertEquals("ab", list.get(9)); + Assert.assertEquals("99", list.get(10)); + Assert.assertEquals("444", list.get(11)); + + list.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + list.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "111", "b")); + SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("222", "d", "333")); + SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g", "tt")); + SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("ab", "99", "444")); + + list.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + list.teardown(); + store.teardown(); + } + + @Test + public void simpleMultiListTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleMultiListTestHelper(store); + } + + @Test + public void simpleMultiListManagedStateTest() + { + simpleMultiListTestHelper(testMeta.store); + } + + public void simpleMultiListTestHelper(SpillableStateStore store) + { + SpillableArrayListImpl<String> list1 = new SpillableArrayListImpl<>(0L, ID1, store, + new SerdeStringSlice(), 1); + + SpillableArrayListImpl<String> list2 = new SpillableArrayListImpl<>(0L, ID2, store, + new SerdeStringSlice(), 1); + + store.setup(testMeta.operatorContext); + list1.setup(testMeta.operatorContext); + list2.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + list1.beginWindow(windowId); + list2.beginWindow(windowId); + + SpillableTestUtils.checkOutOfBounds(list1, 0); + Assert.assertEquals(0, list1.size()); + + list1.add("a"); + + SpillableTestUtils.checkOutOfBounds(list2, 0); + + list2.add("2a"); + + SpillableTestUtils.checkOutOfBounds(list1, 1); + SpillableTestUtils.checkOutOfBounds(list2, 1); + + Assert.assertEquals(1, list1.size()); + Assert.assertEquals(1, list2.size()); + + Assert.assertEquals("a", list1.get(0)); + Assert.assertEquals("2a", list2.get(0)); + + list1.addAll(Lists.newArrayList("a", "b", "c")); + list2.addAll(Lists.newArrayList("2a", "2b")); + + Assert.assertEquals(4, list1.size()); + Assert.assertEquals(3, list2.size()); + + Assert.assertEquals("a", list1.get(0)); + Assert.assertEquals("a", list1.get(1)); + Assert.assertEquals("b", list1.get(2)); + Assert.assertEquals("c", list1.get(3)); + + Assert.assertEquals("2a", list2.get(0)); + Assert.assertEquals("2a", list2.get(1)); + Assert.assertEquals("2b", list2.get(2)); + + SpillableTestUtils.checkOutOfBounds(list1, 4); + SpillableTestUtils.checkOutOfBounds(list2, 3); + + list1.endWindow(); + list2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a")); + SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("a")); + SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b")); + SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c")); + + SpillableTestUtils.checkValue(store, 0L, ID2, 0, Lists.newArrayList("2a")); + SpillableTestUtils.checkValue(store, 0L, ID2, 1, Lists.newArrayList("2a")); + SpillableTestUtils.checkValue(store, 0L, ID2, 2, Lists.newArrayList("2b")); + + windowId++; + store.beginWindow(windowId); + list1.beginWindow(windowId); + list2.beginWindow(windowId); + + Assert.assertEquals(4, list1.size()); + Assert.assertEquals(3, list2.size()); + + Assert.assertEquals("a", list1.get(0)); + Assert.assertEquals("a", list1.get(1)); + Assert.assertEquals("b", list1.get(2)); + Assert.assertEquals("c", list1.get(3)); + + Assert.assertEquals("2a", list2.get(0)); + Assert.assertEquals("2a", list2.get(1)); + Assert.assertEquals("2b", list2.get(2)); + + list1.add("tt"); + list1.add("ab"); + list1.add("99"); + list1.add("oo"); + + list2.add("2tt"); + list2.add("2ab"); + + Assert.assertEquals("tt", list1.get(4)); + Assert.assertEquals("ab", list1.get(5)); + Assert.assertEquals("99", list1.get(6)); + Assert.assertEquals("oo", list1.get(7)); + + Assert.assertEquals("2tt", list2.get(3)); + Assert.assertEquals("2ab", list2.get(4)); + + list1.set(1, "111"); + list2.set(1, "2111"); + + Assert.assertEquals("a", list1.get(0)); + Assert.assertEquals("111", list1.get(1)); + Assert.assertEquals("b", list1.get(2)); + Assert.assertEquals("c", list1.get(3)); + Assert.assertEquals("tt", list1.get(4)); + Assert.assertEquals("ab", list1.get(5)); + Assert.assertEquals("99", list1.get(6)); + Assert.assertEquals("oo", list1.get(7)); + + Assert.assertEquals("2a", list2.get(0)); + Assert.assertEquals("2111", list2.get(1)); + Assert.assertEquals("2b", list2.get(2)); + Assert.assertEquals("2tt", list2.get(3)); + Assert.assertEquals("2ab", list2.get(4)); + + list1.endWindow(); + list2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + list1.beginWindow(windowId); + list2.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a")); + SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("111")); + SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b")); + SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c")); + SpillableTestUtils.checkValue(store, 0L, ID1, 4, Lists.newArrayList("tt")); + SpillableTestUtils.checkValue(store, 0L, ID1, 5, Lists.newArrayList("ab")); + SpillableTestUtils.checkValue(store, 0L, ID1, 6, Lists.newArrayList("99")); + SpillableTestUtils.checkValue(store, 0L, ID1, 7, Lists.newArrayList("oo")); + + SpillableTestUtils.checkValue(store, 0L, ID2, 0, Lists.newArrayList("2a")); + SpillableTestUtils.checkValue(store, 0L, ID2, 1, Lists.newArrayList("2111")); + SpillableTestUtils.checkValue(store, 0L, ID2, 2, Lists.newArrayList("2b")); + SpillableTestUtils.checkValue(store, 0L, ID2, 3, Lists.newArrayList("2tt")); + SpillableTestUtils.checkValue(store, 0L, ID2, 4, Lists.newArrayList("2ab")); + + list1.endWindow(); + list2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + list1.teardown(); + list2.teardown(); + store.teardown(); + } + + @Test + public void recoveryManagedStateTest() + { + SpillableStateStore store = testMeta.store; + + SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store, + new SerdeStringSlice(), 3); + + store.setup(testMeta.operatorContext); + list.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + list.beginWindow(windowId); + + SpillableTestUtils.checkOutOfBounds(list, 0); + + list.add("a"); + list.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g")); + + Assert.assertEquals(8, list.size()); + + list.endWindow(); + store.endWindow(); + + windowId++; + store.beginWindow(windowId); + list.beginWindow(windowId); + + list.add("tt"); + list.add("ab"); + list.add("99"); + list.add("oo"); + + list.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + list.beginWindow(windowId); + + list.set(1, "111"); + list.set(3, "222"); + list.set(5, "333"); + list.set(11, "444"); + + list.endWindow(); + store.endWindow(); + + windowId++; + store.beginWindow(windowId); + list.beginWindow(windowId); + + list.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + long activationWindow = windowId; + SpillableArrayListImpl<String> clonedList = KryoCloneUtils.cloneObject(list); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + list.beginWindow(windowId); + + list.set(1, "111111"); + list.set(3, "222222"); + list.add("xyz"); + + list.endWindow(); + store.endWindow(); + + list.teardown(); + store.teardown(); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); + attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow); + Context.OperatorContext context = + new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); + + list = clonedList; + store = clonedList.getStore(); + + store.setup(context); + list.setup(context); + + windowId = activationWindow + 1L; + store.beginWindow(windowId); + list.beginWindow(windowId); + + Assert.assertEquals("a", list.get(0)); + Assert.assertEquals("111", list.get(1)); + Assert.assertEquals("b", list.get(2)); + Assert.assertEquals("222", list.get(3)); + Assert.assertEquals("d", list.get(4)); + Assert.assertEquals("333", list.get(5)); + Assert.assertEquals("f", list.get(6)); + Assert.assertEquals("g", list.get(7)); + Assert.assertEquals("tt", list.get(8)); + Assert.assertEquals("ab", list.get(9)); + Assert.assertEquals("99", list.get(10)); + Assert.assertEquals("444", list.get(11)); + Assert.assertEquals(12, list.size()); + + list.endWindow(); + store.endWindow(); + + list.teardown(); + store.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java new file mode 100644 index 0000000..42d7d20 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java @@ -0,0 +1,341 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.List; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.Slice; + +public class SpillableByteArrayListMultimapImplTest +{ + public static final byte[] ID1 = new byte[]{(byte)0}; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleMultiKeyTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleMultiKeyTestHelper(store); + } + + @Test + public void simpleMultiKeyManagedStateTest() + { + simpleMultiKeyTestHelper(testMeta.store); + } + + public void simpleMultiKeyTestHelper(SpillableStateStore store) + { + SpillableByteArrayListMultimapImpl<String, String> map = + new SpillableByteArrayListMultimapImpl<String, String>(store, ID1, 0L, new SerdeStringSlice(), + new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long nextWindowId = 0L; + nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId); + nextWindowId++; + + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(1, map.size()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + nextWindowId = simpleMultiKeyTestHelper(store, map, "b", nextWindowId); + nextWindowId++; + + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(2, map.size()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + simpleMultiKeyTestHelper(store, map, "c", nextWindowId); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(3, map.size()); + + map.endWindow(); + store.endWindow(); + + map.teardown(); + store.teardown(); + } + + public long simpleMultiKeyTestHelper(SpillableStateStore store, + SpillableByteArrayListMultimapImpl<String, String> map, String key, long nextWindowId) + { + SerdeStringSlice serdeString = new SerdeStringSlice(); + SerdeIntSlice serdeInt = new SerdeIntSlice(); + + Slice keySlice = serdeString.serialize(key); + + byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray()); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertNull(map.get(key)); + + Assert.assertFalse(map.containsKey(key)); + + map.put(key, "a"); + + Assert.assertTrue(map.containsKey(key)); + + List<String> list1 = map.get(key); + Assert.assertEquals(1, list1.size()); + + Assert.assertEquals("a", list1.get(0)); + + list1.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g")); + + Assert.assertEquals(8, list1.size()); + + Assert.assertEquals("a", list1.get(0)); + Assert.assertEquals("a", list1.get(1)); + Assert.assertEquals("b", list1.get(2)); + Assert.assertEquals("c", list1.get(3)); + Assert.assertEquals("d", list1.get(4)); + Assert.assertEquals("e", list1.get(5)); + Assert.assertEquals("f", list1.get(6)); + Assert.assertEquals("g", list1.get(7)); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + SpillableTestUtils.checkValue(store, 0L, + SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 8, 0, serdeInt); + + SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e", + "f", "g")); + + List<String> list2 = map.get(key); + + Assert.assertEquals(8, list2.size()); + + Assert.assertEquals("a", list2.get(0)); + Assert.assertEquals("a", list2.get(1)); + Assert.assertEquals("b", list2.get(2)); + Assert.assertEquals("c", list2.get(3)); + Assert.assertEquals("d", list2.get(4)); + Assert.assertEquals("e", list2.get(5)); + Assert.assertEquals("f", list2.get(6)); + Assert.assertEquals("g", list2.get(7)); + + list2.add("tt"); + list2.add("ab"); + list2.add("99"); + list2.add("oo"); + + Assert.assertEquals("tt", list2.get(8)); + Assert.assertEquals("ab", list2.get(9)); + Assert.assertEquals("99", list2.get(10)); + Assert.assertEquals("oo", list2.get(11)); + + Assert.assertEquals(12, list2.size()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(12, list2.size()); + + SpillableTestUtils.checkValue(store, 0L, + SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt); + + SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e", + "f", "g", "tt", "ab", "99", "oo")); + + List<String> list3 = map.get(key); + + list3.set(1, "111"); + list3.set(3, "222"); + list3.set(5, "333"); + list3.set(11, "444"); + + Assert.assertEquals("a", list3.get(0)); + Assert.assertEquals("111", list3.get(1)); + Assert.assertEquals("b", list3.get(2)); + Assert.assertEquals("222", list3.get(3)); + Assert.assertEquals("d", list3.get(4)); + Assert.assertEquals("333", list3.get(5)); + Assert.assertEquals("f", list3.get(6)); + Assert.assertEquals("g", list3.get(7)); + Assert.assertEquals("tt", list3.get(8)); + Assert.assertEquals("ab", list3.get(9)); + Assert.assertEquals("99", list3.get(10)); + Assert.assertEquals("444", list3.get(11)); + + Assert.assertEquals(12, list2.size()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + SpillableTestUtils.checkValue(store, 0L, + SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt); + + SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d", "333", + "f", "g", "tt", "ab", "99", "444")); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(nextWindowId); + + return nextWindowId; + } + + @Test + public void recoveryTestWithManagedState() + { + SpillableStateStore store = testMeta.store; + + SpillableByteArrayListMultimapImpl<String, String> map = + new SpillableByteArrayListMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long nextWindowId = 0L; + nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId); + long activationWindow = nextWindowId; + nextWindowId++; + + SpillableByteArrayListMultimapImpl<String, String> clonedMap = KryoCloneUtils.cloneObject(map); + store.checkpointed(nextWindowId); + store.committed(nextWindowId); + + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + List<String> list1 = map.get("a"); + + Assert.assertEquals(12, list1.size()); + + Assert.assertEquals("a", list1.get(0)); + Assert.assertEquals("111", list1.get(1)); + Assert.assertEquals("b", list1.get(2)); + Assert.assertEquals("222", list1.get(3)); + Assert.assertEquals("d", list1.get(4)); + Assert.assertEquals("333", list1.get(5)); + Assert.assertEquals("f", list1.get(6)); + Assert.assertEquals("g", list1.get(7)); + Assert.assertEquals("tt", list1.get(8)); + Assert.assertEquals("ab", list1.get(9)); + Assert.assertEquals("99", list1.get(10)); + Assert.assertEquals("444", list1.get(11)); + + list1.add("111"); + + Assert.assertEquals("a", list1.get(0)); + Assert.assertEquals("111", list1.get(1)); + Assert.assertEquals("b", list1.get(2)); + Assert.assertEquals("222", list1.get(3)); + Assert.assertEquals("d", list1.get(4)); + Assert.assertEquals("333", list1.get(5)); + Assert.assertEquals("f", list1.get(6)); + Assert.assertEquals("g", list1.get(7)); + Assert.assertEquals("tt", list1.get(8)); + Assert.assertEquals("ab", list1.get(9)); + Assert.assertEquals("99", list1.get(10)); + Assert.assertEquals("444", list1.get(11)); + Assert.assertEquals("111", list1.get(12)); + + Assert.assertEquals(13, list1.size()); + + map.endWindow(); + store.endWindow(); + + map.teardown(); + store.teardown(); + + map = clonedMap; + store = map.getStore(); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); + attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow); + Context.OperatorContext context = + new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); + + store.setup(context); + map.setup(context); + + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + SerdeStringSlice serdeString = new SerdeStringSlice(); + Slice keySlice = serdeString.serialize("a"); + byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray()); + + SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d", + "333", "f", "g", "tt", "ab", "99", "444")); + + Assert.assertEquals(1, map.size()); + Assert.assertEquals(12, map.get("a").size()); + + map.endWindow(); + store.endWindow(); + + map.teardown(); + store.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java new file mode 100644 index 0000000..63f7b79 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java @@ -0,0 +1,484 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.KryoCloneUtils; + +public class SpillableByteMapImplTest +{ + public static final byte[] ID1 = new byte[]{(byte)0}; + public static final byte[] ID2 = new byte[]{(byte)1}; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleGetAndPutTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleGetAndPutTestHelper(store); + } + + @Test + public void simpleGetAndPutManagedStateTest() + { + simpleGetAndPutTestHelper(testMeta.store); + } + + private void simpleGetAndPutTestHelper(SpillableStateStore store) + { + SerdeStringSlice sss = new SerdeStringSlice(); + + SpillableByteMapImpl<String, String> map = new SpillableByteMapImpl<>(store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + map.beginWindow(windowId); + + Assert.assertEquals(0, map.size()); + + map.put("a", "1"); + map.put("b", "2"); + map.put("c", "3"); + + Assert.assertEquals(3, map.size()); + + Assert.assertEquals("1", map.get("a")); + Assert.assertEquals("2", map.get("b")); + Assert.assertEquals("3", map.get("c")); + Assert.assertEquals(null, map.get("d")); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + + Assert.assertEquals(3, map.size()); + + Assert.assertEquals("1", map.get("a")); + Assert.assertEquals("2", map.get("b")); + Assert.assertEquals("3", map.get("c")); + Assert.assertEquals(null, map.get("d")); + + map.put("d", "4"); + map.put("e", "5"); + map.put("f", "6"); + + Assert.assertEquals(6, map.size()); + + Assert.assertEquals("4", map.get("d")); + Assert.assertEquals("5", map.get("e")); + Assert.assertEquals("6", map.get("f")); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); + SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + map.teardown(); + store.teardown(); + } + + @Test + public void simpleRemoveTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleRemoveTestHelper(store); + } + + @Test + public void simpleRemoveManagedStateTest() + { + simpleRemoveTestHelper(testMeta.store); + } + + private void simpleRemoveTestHelper(SpillableStateStore store) + { + SerdeStringSlice sss = new SerdeStringSlice(); + + SpillableByteMapImpl<String, String> map = new SpillableByteMapImpl<>(store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + map.beginWindow(windowId); + + Assert.assertEquals(0, map.size()); + + map.put("a", "1"); + map.put("b", "2"); + map.put("c", "3"); + + Assert.assertEquals(3, map.size()); + + map.remove("b"); + map.remove("c"); + + Assert.assertEquals("1", map.get("a")); + Assert.assertEquals(null, map.get("b")); + Assert.assertEquals(null, map.get("c")); + Assert.assertEquals(null, map.get("d")); + + Assert.assertEquals(1, map.size()); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + Assert.assertEquals(1, map.size()); + + Assert.assertEquals("1", map.get("a")); + Assert.assertEquals(null, map.get("b")); + Assert.assertEquals(null, map.get("c")); + Assert.assertEquals(null, map.get("d")); + + map.put("d", "4"); + map.put("e", "5"); + map.put("f", "6"); + + Assert.assertEquals(4, map.size()); + + Assert.assertEquals("4", map.get("d")); + Assert.assertEquals("5", map.get("e")); + Assert.assertEquals("6", map.get("f")); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); + SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + + map.remove("a"); + map.remove("d"); + Assert.assertEquals(null, map.get("a")); + Assert.assertEquals(null, map.get("b")); + Assert.assertEquals(null, map.get("c")); + Assert.assertEquals(null, map.get("d")); + Assert.assertEquals("5", map.get("e")); + Assert.assertEquals("6", map.get("f")); + Assert.assertEquals(null, map.get("g")); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); + SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + windowId++; + store.beginWindow(windowId); + map.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); + SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); + SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + + map.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + store.committed(windowId); + + map.teardown(); + store.teardown(); + } + + @Test + public void multiMapPerBucketTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + multiMapPerBucketTestHelper(store); + } + + @Test + public void multiMapPerBucketManagedStateTest() + { + multiMapPerBucketTestHelper(testMeta.store); + } + + public void multiMapPerBucketTestHelper(SpillableStateStore store) + { + SerdeStringSlice sss = new SerdeStringSlice(); + + SpillableByteMapImpl<String, String> map1 = new SpillableByteMapImpl<>(store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice()); + SpillableByteMapImpl<String, String> map2 = new SpillableByteMapImpl<>(store, ID2, 0L, + new SerdeStringSlice(), + new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map1.setup(testMeta.operatorContext); + map2.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + map1.beginWindow(windowId); + map2.beginWindow(windowId); + + map1.put("a", "1"); + + Assert.assertEquals("1", map1.get("a")); + Assert.assertEquals(null, map2.get("a")); + + map2.put("a", "a1"); + + Assert.assertEquals("1", map1.get("a")); + Assert.assertEquals("a1", map2.get("a")); + + map1.put("b", "2"); + map2.put("c", "3"); + + Assert.assertEquals("1", map1.get("a")); + Assert.assertEquals("2", map1.get("b")); + + Assert.assertEquals("a1", map2.get("a")); + Assert.assertEquals(null, map2.get("b")); + Assert.assertEquals("3", map2.get("c")); + + map1.endWindow(); + map2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + + windowId++; + store.beginWindow(windowId); + map1.beginWindow(windowId); + map2.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); + + SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1"); + SpillableTestUtils.checkValue(store, 0L, "b", ID2, null); + SpillableTestUtils.checkValue(store, 0L, "c", ID2, "3"); + + map1.remove("a"); + + Assert.assertEquals(null, map1.get("a")); + Assert.assertEquals("a1", map2.get("a")); + + map1.endWindow(); + map2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + + windowId++; + store.beginWindow(windowId); + map1.beginWindow(windowId); + map2.beginWindow(windowId); + + SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); + SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1"); + + map1.endWindow(); + map2.endWindow(); + store.endWindow(); + store.beforeCheckpoint(windowId); + store.checkpointed(windowId); + + map1.teardown(); + map2.teardown(); + store.teardown(); + } + + @Test + public void recoveryWithManagedStateTest() throws Exception + { + SerdeStringSlice sss = new SerdeStringSlice(); + + SpillableByteMapImpl<String, String> map1 = new SpillableByteMapImpl<>(testMeta.store, ID1, 0L, + new SerdeStringSlice(), + new SerdeStringSlice()); + + testMeta.store.setup(testMeta.operatorContext); + map1.setup(testMeta.operatorContext); + + testMeta.store.beginWindow(0); + map1.beginWindow(0); + map1.put("x", "1"); + map1.put("y", "2"); + map1.put("z", "3"); + map1.put("zz", "33"); + Assert.assertEquals(4, map1.size()); + map1.endWindow(); + testMeta.store.endWindow(); + + testMeta.store.beginWindow(1); + map1.beginWindow(1); + Assert.assertEquals(4, map1.size()); + map1.put("x", "4"); + map1.put("y", "5"); + map1.remove("zz"); + Assert.assertEquals(3, map1.size()); + map1.endWindow(); + testMeta.store.endWindow(); + testMeta.store.beforeCheckpoint(1); + testMeta.store.checkpointed(1); + + SpillableByteMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1); + + testMeta.store.beginWindow(2); + map1.beginWindow(2); + Assert.assertEquals(3, map1.size()); + map1.put("x", "6"); + map1.put("y", "7"); + map1.put("w", "8"); + Assert.assertEquals(4, map1.size()); + map1.endWindow(); + testMeta.store.endWindow(); + + // simulating crash here + map1.teardown(); + testMeta.store.teardown(); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); + attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L); + Context.OperatorContext context = + new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); + + map1 = clonedMap1; + map1.getStore().setup(context); + map1.setup(testMeta.operatorContext); + + map1.getStore().beginWindow(2); + map1.beginWindow(2); + Assert.assertEquals(3, map1.size()); + Assert.assertEquals("4", map1.get("x")); + Assert.assertEquals("5", map1.get("y")); + Assert.assertEquals("3", map1.get("z")); + map1.endWindow(); + map1.getStore().endWindow(); + + map1.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java new file mode 100644 index 0000000..67db6ba --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; + +public class SpillableComplexComponentImplTest +{ + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleIntegrationTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleIntegrationTestHelper(store); + } + + @Test + public void simpleIntegrationManagedStateTest() + { + simpleIntegrationTestHelper(testMeta.store); + } + + public void simpleIntegrationTestHelper(SpillableStateStore store) + { + SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(store); + + Spillable.SpillableComponent scList = + (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new SerdeStringSlice()); + Spillable.SpillableComponent scMap = + (Spillable.SpillableComponent)sccImpl.newSpillableByteMap(0L, new SerdeStringSlice(), new SerdeStringSlice()); + + sccImpl.setup(testMeta.operatorContext); + + sccImpl.beginWindow(0L); + + sccImpl.endWindow(); + + sccImpl.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java new file mode 100644 index 0000000..00ea58d --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.List; + +import org.junit.Assert; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.commons.lang3.mutable.MutableInt; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.netlet.util.Slice; + +/** + * This class contains utility methods that can be used by Spillable data structure unit tests. + */ +public class SpillableTestUtils +{ + public static SerdeStringSlice SERDE_STRING_SLICE = new SerdeStringSlice(); + public static SerdeListSlice<String> SERDE_STRING_LIST_SLICE = new SerdeListSlice(new SerdeStringSlice()); + + private SpillableTestUtils() + { + //Shouldn't instantiate this + } + + static class TestMeta extends TestWatcher + { + ManagedStateSpillableStateStore store; + Context.OperatorContext operatorContext; + String applicationPath; + + @Override + protected void starting(Description description) + { + TestUtils.deleteTargetTestClassFolder(description); + store = new ManagedStateSpillableStateStore(); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + ((FileAccessFSImpl)store.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data"); + + operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath); + } + + @Override + protected void finished(Description description) + { + TestUtils.deleteTargetTestClassFolder(description); + } + } + + public static Slice getKeySlice(byte[] id, String key) + { + return SliceUtils.concatenate(id, SERDE_STRING_SLICE.serialize(key)); + } + + public static Slice getKeySlice(byte[] id, int index, String key) + { + return SliceUtils.concatenate(id, + SliceUtils.concatenate(GPOUtils.serializeInt(index), + SERDE_STRING_SLICE.serialize(key))); + } + + public static void checkValue(SpillableStateStore store, long bucketId, String key, + byte[] prefix, String expectedValue) + { + checkValue(store, bucketId, SliceUtils.concatenate(prefix, SERDE_STRING_SLICE.serialize(key)).buffer, + expectedValue, 0, SERDE_STRING_SLICE); + } + + public static void checkValue(SpillableStateStore store, long bucketId, + byte[] prefix, int index, List<String> expectedValue) + { + checkValue(store, bucketId, SliceUtils.concatenate(prefix, GPOUtils.serializeInt(index)), expectedValue, 0, + SERDE_STRING_LIST_SLICE); + } + + public static <T> void checkValue(SpillableStateStore store, long bucketId, byte[] bytes, + T expectedValue, int offset, Serde<T, Slice> serde) + { + Slice slice = store.getSync(bucketId, new Slice(bytes)); + + if (slice == null || slice.length == 0) { + if (expectedValue != null) { + Assert.assertEquals(expectedValue, slice); + } else { + return; + } + } + + T string = serde.deserialize(slice, new MutableInt(offset)); + + Assert.assertEquals(expectedValue, string); + } + + public static void checkOutOfBounds(SpillableArrayListImpl<String> list, int index) + { + boolean exceptionThrown = false; + + try { + list.get(index); + } catch (IndexOutOfBoundsException ex) { + exceptionThrown = true; + } + + Assert.assertTrue(exceptionThrown); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java new file mode 100644 index 0000000..8033a7d --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Created by tfarkas on 6/4/16. + */ +public class TimeBasedPriorityQueueTest +{ + @Test + public void simpleInsertAndRemoveTest() + { + TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>(); + queue.upSert("a"); + queue.remove("a"); + + overRemoveTest(queue, 1); + } + + @Test + public void simpleInsertAndLRURemoveTest() + { + TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>(); + queue.upSert("a"); + + Set<String> set = queue.removeLRU(1); + + Assert.assertEquals(Sets.newHashSet("a"), set); + } + + @Test + public void simpleLRUTest() throws Exception + { + TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>(); + + queue.upSert("a"); + Thread.sleep(1L); + + queue.upSert("b"); + Thread.sleep(1L); + + queue.upSert("a"); + + Set<String> set = queue.removeLRU(1); + + Assert.assertEquals(Sets.newHashSet("b"), set); + } + + @Test + public void complexLRUTest() throws Exception + { + //0, 3, 6, 9 + //1, 4, 7 + //2, 5, 8 + + TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>(); + + for (int counter = 0; counter < 10; counter++) { + String val = "" + counter; + + queue.upSert(val); + Thread.sleep(1L); + } + + for (int counter = 0; counter < 10; counter++) { + if (counter % 3 != 1) { + continue; + } + + String val = "" + counter; + queue.remove(val); + } + + for (int counter = 0; counter < 10; counter++) { + if (counter % 3 != 0) { + continue; + } + + String val = "" + counter; + queue.upSert(val); + Thread.sleep(1L); + } + + //2, 5, 8, 0, 3, 6, 9 + + overRemoveTest(queue, 8); + + Set<String> expiredValues = queue.removeLRU(3); + + Assert.assertEquals(Sets.newHashSet("2", "5", "8"), expiredValues); + + overRemoveTest(queue, 6); + + expiredValues = queue.removeLRU(4); + + Assert.assertEquals(Sets.newHashSet("0", "3", "6", "9"), expiredValues); + } + + private void overRemoveTest(TimeBasedPriorityQueue<String> queue, int removeCount) + { + boolean exceptionThrown = false; + + try { + queue.removeLRU(removeCount); + } catch (IllegalArgumentException e) { + exceptionThrown = true; + } + + Assert.assertTrue(exceptionThrown); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java new file mode 100644 index 0000000..b1710dd --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Sets; + +public class WindowBoundedMapCacheTest +{ + @Test + public void simplePutGetTest() + { + WindowBoundedMapCache<String, String> cache = new WindowBoundedMapCache<>(); + + long windowId = 0L; + + windowId++; + + cache.put("1", "a"); + Assert.assertEquals("a", cache.get("1")); + + cache.endWindow(); + + windowId++; + + Assert.assertEquals("a", cache.get("1")); + + cache.endWindow(); + } + + @Test + public void getChangedGetRemovedTest() + { + WindowBoundedMapCache<String, String> cache = new WindowBoundedMapCache<>(); + + long windowId = 0L; + + windowId++; + + cache.put("1", "a"); + cache.put("2", "b"); + + Assert.assertEquals(Sets.newHashSet("1", "2"), cache.getChangedKeys()); + Assert.assertEquals(Sets.newHashSet(), cache.getRemovedKeys()); + + cache.endWindow(); + + windowId++; + + cache.remove("1"); + + Assert.assertEquals(Sets.newHashSet(), cache.getChangedKeys()); + Assert.assertEquals(Sets.newHashSet("1"), cache.getRemovedKeys()); + + Assert.assertEquals(null, cache.get("1")); + Assert.assertEquals("b", cache.get("2")); + + cache.endWindow(); + + windowId++; + + Assert.assertEquals(Sets.newHashSet(), cache.getChangedKeys()); + Assert.assertEquals(Sets.newHashSet(), cache.getRemovedKeys()); + + cache.endWindow(); + } + + @Test + public void expirationTest() throws Exception + { + WindowBoundedMapCache<String, String> cache = new WindowBoundedMapCache<>(2); + + long windowId = 0L; + + windowId++; + + cache.put("1", "a"); + Thread.sleep(1L); + cache.put("2", "b"); + Thread.sleep(1L); + cache.put("3", "c"); + + Assert.assertEquals(Sets.newHashSet("1", "2", "3"), cache.getChangedKeys()); + + cache.endWindow(); + + windowId++; + + Assert.assertEquals(null, cache.get("1")); + Assert.assertEquals("b", cache.get("2")); + Assert.assertEquals("c", cache.get("3")); + + Assert.assertEquals(Sets.newHashSet(), cache.getChangedKeys()); + Assert.assertEquals(Sets.newHashSet(), cache.getRemovedKeys()); + + cache.endWindow(); + } +}
