Repository: apex-malhar Updated Branches: refs/heads/master 37991576d -> 6ddefd02a
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java new file mode 100644 index 0000000..fa4cd73 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.spillable.WindowListener; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; + +/** + * This is a stream which manages blocks and supports window related operations. + * + */ +public class WindowedBlockStream extends BlockStream implements WindowListener, WindowCompleteListener +{ + private static final Logger logger = LoggerFactory.getLogger(WindowedBlockStream.class); + /** + * Map from windowId to blockIds + */ + protected SetMultimap<Long, Integer> windowToBlockIds = HashMultimap.create(); + + /** + * set of all free blockIds. + */ + protected Set<Integer> freeBlockIds = Sets.newHashSet(); + + // max block index; must be >= 0 + protected int maxBlockIndex = 0; + + protected long currentWindowId; + + /** + * This lock is used for adding/removing block(s) + */ + protected transient ReadWriteLock lock = new ReentrantReadWriteLock(); + + protected BlockReleaseStrategy releaseStrategy = new DefaultBlockReleaseStrategy(); + + public WindowedBlockStream() + { + super(); + } + + public WindowedBlockStream(int blockCapacity) + { + super(blockCapacity); + } + + @Override + public void beginWindow(long windowId) + { + currentWindowId = windowId; + moveToNextWindow(); + } + + /** + * make sure different windows will not share any blocks. Move to next block if + * current block is already used. + */ + protected void moveToNextWindow() + { + //use current block if it hasn't be used, else, move to next block + Block block = getOrCreateCurrentBlock(); + if (!block.isClear()) { + throw new RuntimeException("Current block not clear, should NOT move to next window. Please call toSlice() to output data first"); + } + if (block.size() > 0) { + moveToNextBlock(); + } + windowToBlockIds.put(currentWindowId, currentBlockIndex); + } + + /** + * This method tries to use a free block first. Allocate a new block if there + * are no free blocks + * + * @return The previous block + */ + @Override + protected Block moveToNextBlock() + { + lock.writeLock().lock(); + try { + Block previousBlock = currentBlock; + if (!freeBlockIds.isEmpty()) { + currentBlockIndex = freeBlockIds.iterator().next(); + freeBlockIds.remove(currentBlockIndex); + currentBlock = this.blocks.get(currentBlockIndex); + } else { + currentBlockIndex = ++maxBlockIndex; + currentBlock = getOrCreateCurrentBlock(); + } + windowToBlockIds.put(currentWindowId, currentBlockIndex); + return previousBlock; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void endWindow() + { + releaseMemory(); + } + + @Override + public void completeWindow(long windowId) + { + lock.writeLock().lock(); + try { + Set<Long> windIds = Sets.newHashSet(windowToBlockIds.keySet()); + for (long windId : windIds) { + if (windId <= windowId) { + resetWindow(windId); + } + } + } finally { + lock.writeLock().unlock(); + } + } + + protected void resetWindow(long windowId) + { + lock.writeLock().lock(); + try { + Set<Integer> removedBlockIds = windowToBlockIds.removeAll(windowId); + + int removedSize = 0; + for (int blockId : removedBlockIds) { + removedSize += blocks.get(blockId).size(); + Block theBlock = blocks.get(blockId); + theBlock.reset(); + if (theBlock == currentBlock) { + //the client code could ask reset up to current window + //but the reset block should not be current block. current block should be reassigned. + moveToNextBlock(); + } + logger.debug("reset block: {}, currentBlock: {}", blockId, theBlock); + } + + freeBlockIds.addAll(removedBlockIds); + size -= removedSize; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void reset() + { + lock.writeLock().lock(); + try { + super.reset(); + + //all blocks are free now except the current one + freeBlockIds.addAll(blocks.keySet()); + freeBlockIds.remove(currentBlockIndex); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * The size of the data of all windows with id less than or equals to windowId + * @param windowId + * @return + */ + public long dataSizeUpToWindow(long windowId) + { + lock.readLock().lock(); + try { + long totalSize = 0; + for (long winId : windowToBlockIds.keySet()) { + totalSize += dataSizeOfWindow(winId); + } + return totalSize; + } finally { + lock.readLock().unlock(); + } + } + + protected long dataSizeOfWindow(long windowId) + { + lock.readLock().lock(); + try { + long sizeOfWindow = 0; + Set<Integer> blockIds = windowToBlockIds.get(windowId); + if (blockIds != null) { + for (int blockId : blockIds) { + sizeOfWindow += blocks.get(blockId).size(); + } + } + return sizeOfWindow; + } finally { + lock.readLock().unlock(); + } + } + + public void releaseMemory() + { + /** + * report and release extra blocks + */ + releaseStrategy.currentFreeBlocks(freeBlockIds.size()); + int releasingBlocks = Math.min(releaseStrategy.getNumBlocksToRelease(), freeBlockIds.size()); + int releasedBlocks = 0; + Iterator<Integer> iter = freeBlockIds.iterator(); + while (releasedBlocks < releasingBlocks) { + //release blocks + int blockId = iter.next(); + iter.remove(); + blocks.remove(blockId); + releasedBlocks++; + } + + /** + * report number of released blocks + */ + if (releasedBlocks > 0) { + releaseStrategy.releasedBlocks(releasedBlocks); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java index da44fb1..b88501e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java @@ -34,7 +34,6 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.Slice; /** * Spillable session windowed storage. @@ -53,7 +52,7 @@ public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeye if (keyToWindowsMap == null) { // NOTE: this will pose difficulties when we try to assign the entries to a time bucket later on. // This is logged in APEXMALHAR-2271 - keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>, Slice>)(Serde)windowSerde); + keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>>)(Serde)windowSerde); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java index ac386ab..ef111b3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java @@ -27,15 +27,14 @@ import javax.validation.constraints.NotNull; import org.apache.apex.malhar.lib.state.spillable.Spillable; import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.GenericSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; -import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice; import org.apache.apex.malhar.lib.window.Window; import org.apache.apex.malhar.lib.window.WindowedStorage; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.Slice; /** * Implementation of WindowedKeyedStorage using {@link Spillable} data structures @@ -48,10 +47,10 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind @NotNull protected SpillableComplexComponent scc; protected long bucket; - protected Serde<Window, Slice> windowSerde; - protected Serde<Pair<Window, K>, Slice> windowKeyPairSerde; - protected Serde<K, Slice> keySerde; - protected Serde<V, Slice> valueSerde; + protected Serde<Window> windowSerde; + protected Serde<Pair<Window, K>> windowKeyPairSerde; + protected Serde<K> keySerde; + protected Serde<V> valueSerde; protected Spillable.SpillableMap<Pair<Window, K>, V> windowKeyToValueMap; protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap; @@ -96,7 +95,7 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind } public SpillableWindowedKeyedStorage(long bucket, - Serde<Window, Slice> windowSerde, Serde<Pair<Window, K>, Slice> windowKeyPairSerde, Serde<K, Slice> keySerde, Serde<V, Slice> valueSerde) + Serde<Window> windowSerde, Serde<Pair<Window, K>> windowKeyPairSerde, Serde<K> keySerde, Serde<V> valueSerde) { this.bucket = bucket; this.windowSerde = windowSerde; @@ -120,17 +119,17 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind this.bucket = bucket; } - public void setWindowSerde(Serde<Window, Slice> windowSerde) + public void setWindowSerde(Serde<Window> windowSerde) { this.windowSerde = windowSerde; } - public void setWindowKeyPairSerde(Serde<Pair<Window, K>, Slice> windowKeyPairSerde) + public void setWindowKeyPairSerde(Serde<Pair<Window, K>> windowKeyPairSerde) { this.windowKeyPairSerde = windowKeyPairSerde; } - public void setValueSerde(Serde<V, Slice> valueSerde) + public void setValueSerde(Serde<V> valueSerde) { this.valueSerde = valueSerde; } @@ -168,16 +167,16 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind } // set default serdes if (windowSerde == null) { - windowSerde = new SerdeKryoSlice<>(); + windowSerde = new GenericSerde<>(); } if (windowKeyPairSerde == null) { - windowKeyPairSerde = new SerdeKryoSlice<>(); + windowKeyPairSerde = new GenericSerde<>(); } if (keySerde == null) { - keySerde = new SerdeKryoSlice<>(); + keySerde = new GenericSerde<>(); } if (valueSerde == null) { - valueSerde = new SerdeKryoSlice<>(); + valueSerde = new GenericSerde<>(); } if (windowKeyToValueMap == null) { @@ -220,5 +219,4 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind { return windowKeyToValueMap.get(new ImmutablePair<>(window, key)); } - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java index 6666381..9a8a291 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java @@ -24,13 +24,12 @@ import javax.validation.constraints.NotNull; import org.apache.apex.malhar.lib.state.spillable.Spillable; import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; +import org.apache.apex.malhar.lib.utils.serde.GenericSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; -import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice; import org.apache.apex.malhar.lib.window.Window; import org.apache.apex.malhar.lib.window.WindowedStorage; import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.Slice; /** * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures @@ -42,8 +41,8 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe @NotNull private SpillableComplexComponent scc; private long bucket; - private Serde<Window, Slice> windowSerde; - private Serde<T, Slice> valueSerde; + private Serde<Window> windowSerde; + private Serde<T> valueSerde; protected Spillable.SpillableMap<Window, T> windowToDataMap; @@ -51,7 +50,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe { } - public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde) + public SpillableWindowedPlainStorage(long bucket, Serde<Window> windowSerde, Serde<T> valueSerde) { this.bucket = bucket; this.windowSerde = windowSerde; @@ -73,12 +72,12 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe this.bucket = bucket; } - public void setWindowSerde(Serde<Window, Slice> windowSerde) + public void setWindowSerde(Serde<Window> windowSerde) { this.windowSerde = windowSerde; } - public void setValueSerde(Serde<T, Slice> valueSerde) + public void setValueSerde(Serde<T> valueSerde) { this.valueSerde = valueSerde; } @@ -128,10 +127,10 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe } // set default serdes if (windowSerde == null) { - windowSerde = new SerdeKryoSlice<>(); + windowSerde = new GenericSerde<>(); } if (valueSerde == null) { - valueSerde = new SerdeKryoSlice<>(); + valueSerde = new GenericSerde<>(); } if (windowToDataMap == null) { windowToDataMap = scc.newSpillableMap(bucket, windowSerde, valueSerde); @@ -142,5 +141,4 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe public void teardown() { } - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/com/datatorrent/lib/util/TestUtils.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java index 403072d..92937a9 100644 --- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java +++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java @@ -24,6 +24,7 @@ import java.util.List; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.apache.apex.malhar.lib.utils.serde.BufferSlice; import org.apache.commons.io.FileUtils; import com.google.common.base.Preconditions; @@ -57,7 +58,7 @@ public class TestUtils public static Slice getSlice(int val) { - return new Slice(getBytes(val)); + return new BufferSlice(getBytes(val)); } public static class TestInfo extends TestWatcher http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java index 2058b69..6645a98 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java @@ -28,6 +28,12 @@ import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.apache.apex.malhar.lib.state.managed.Bucket.ReadSource; +import org.apache.apex.malhar.lib.utils.serde.AffixSerde; +import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer; +import org.apache.apex.malhar.lib.utils.serde.StringSerde; +import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream; + import com.google.common.primitives.Longs; import com.datatorrent.lib.fileaccess.FileAccess; @@ -82,6 +88,7 @@ public class DefaultBucketTest Assert.assertNull("value not present", value); Assert.assertEquals("size of bucket", one.length * 2 + Longs.BYTES, testMeta.defaultBucket.getSizeInBytes()); + testMeta.defaultBucket.teardown(); } @@ -126,7 +133,6 @@ public class DefaultBucketTest Slice one = ManagedStateTestUtils.getSliceFor("1"); testPut(); Map<Slice, Bucket.BucketedValue> unsaved = testMeta.defaultBucket.checkpoint(10); - Assert.assertEquals("size", 1, unsaved.size()); Map.Entry<Slice, Bucket.BucketedValue> entry = unsaved.entrySet().iterator().next(); Assert.assertEquals("key", one, entry.getKey()); @@ -192,15 +198,39 @@ public class DefaultBucketTest testGetFromReader(); long initSize = testMeta.defaultBucket.getSizeInBytes(); - Slice two = ManagedStateTestUtils.getSliceFor("2"); - testMeta.defaultBucket.put(two, 101, two); - - Assert.assertEquals("size", initSize + (two.length * 2 + Longs.BYTES ), testMeta.defaultBucket.getSizeInBytes()); + //The temporary memory generated by get was not managed by bucket, only put was managed by bucket + SerializationBuffer buffer = new SerializationBuffer(testMeta.defaultBucket.getKeyStream()); + byte[] keyPrefix = new byte[]{0}; + String key = "1"; + String value = "2"; + AffixSerde<String> keySerde = new AffixSerde<>(keyPrefix, new StringSerde(), null); + + StringSerde valueSerde = new StringSerde(); + + testMeta.defaultBucket.getKeyStream().beginWindow(1); + testMeta.defaultBucket.getValueStream().beginWindow(1); + keySerde.serialize(key, buffer); + Slice keySlice = buffer.toSlice(); + valueSerde.serialize(value, buffer); + Slice valueSlice = buffer.toSlice(); + testMeta.defaultBucket.put(keySlice, 1, valueSlice); + testMeta.defaultBucket.getKeyStream().endWindow(); + testMeta.defaultBucket.getValueStream().endWindow(); + + long currentSize = testMeta.defaultBucket.getSizeInBytes(); + testMeta.defaultBucket.freeMemory(Long.MAX_VALUE); + //call this method to invoke the release memory + testMeta.defaultBucket.get(keySlice, -1, ReadSource.MEMORY); + long sizeFreed = currentSize - testMeta.defaultBucket.getSizeInBytes(); + + SerializationBuffer tmpBuffer = new SerializationBuffer(new WindowedBlockStream()); + tmpBuffer.writeBytes(keyPrefix); + tmpBuffer.writeString(key); + tmpBuffer.writeString(value); + int expectedFreedSize = tmpBuffer.toSlice().toByteArray().length; //key prefix, key length, key; value length, value + Assert.assertEquals("size freed", expectedFreedSize, sizeFreed); + Assert.assertEquals("existing size", currentSize - expectedFreedSize, testMeta.defaultBucket.getSizeInBytes()); - long sizeFreed = testMeta.defaultBucket.freeMemory(Long.MAX_VALUE); - Assert.assertEquals("size freed", initSize, sizeFreed); - Assert.assertEquals("existing size", (two.length * 2 + Longs.BYTES), testMeta.defaultBucket.getSizeInBytes()); testMeta.defaultBucket.teardown(); } - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java index 0d3f87a..86f8430 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java @@ -28,6 +28,7 @@ import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.utils.serde.BufferSlice; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.RemoteIterator; @@ -128,6 +129,6 @@ public class ManagedStateTestUtils public static Slice getSliceFor(String x) { - return new Slice(x.getBytes()); + return new BufferSlice(x.getBytes()); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java index af05c88..5dd6404 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java @@ -23,7 +23,7 @@ import org.junit.Rule; import org.junit.Test; import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; -import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.StringSerde; import com.google.common.collect.Lists; @@ -58,7 +58,7 @@ public class SpillableArrayListImplTest public void simpleAddGetAndSetTest1Helper(SpillableStateStore store) { SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store, - new SerdeStringSlice(), 1); + new StringSerde(), 1); store.setup(testMeta.operatorContext); list.setup(testMeta.operatorContext); @@ -177,7 +177,7 @@ public class SpillableArrayListImplTest private void simpleAddGetAndSetTest3Helper(SpillableStateStore store) { SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store, - new SerdeStringSlice(), 3); + new StringSerde(), 3); store.setup(testMeta.operatorContext); list.setup(testMeta.operatorContext); @@ -321,10 +321,10 @@ public class SpillableArrayListImplTest public void simpleMultiListTestHelper(SpillableStateStore store) { SpillableArrayListImpl<String> list1 = new SpillableArrayListImpl<>(0L, ID1, store, - new SerdeStringSlice(), 1); + new StringSerde(), 1); SpillableArrayListImpl<String> list2 = new SpillableArrayListImpl<>(0L, ID2, store, - new SerdeStringSlice(), 1); + new StringSerde(), 1); store.setup(testMeta.operatorContext); list1.setup(testMeta.operatorContext); @@ -483,7 +483,7 @@ public class SpillableArrayListImplTest SpillableStateStore store = testMeta.store; SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store, - new SerdeStringSlice(), 3); + new StringSerde(), 3); store.setup(testMeta.operatorContext); list.setup(testMeta.operatorContext); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java index 82fb340..d21bf50 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java @@ -26,9 +26,11 @@ import org.junit.Rule; import org.junit.Test; import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; -import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; -import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.IntSerde; +import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer; import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.apex.malhar.lib.utils.serde.StringSerde; +import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream; import com.google.common.collect.Lists; @@ -63,8 +65,8 @@ public class SpillableArrayListMultimapImplTest public void simpleMultiKeyTestHelper(SpillableStateStore store) { SpillableArrayListMultimapImpl<String, String> map = - new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new SerdeStringSlice(), - new SerdeStringSlice()); + new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new StringSerde(), + new StringSerde()); store.setup(testMeta.operatorContext); map.setup(testMeta.operatorContext); @@ -112,11 +114,11 @@ public class SpillableArrayListMultimapImplTest public long simpleMultiKeyTestHelper(SpillableStateStore store, SpillableArrayListMultimapImpl<String, String> map, String key, long nextWindowId) { - SerdeStringSlice serdeString = new SerdeStringSlice(); - SerdeIntSlice serdeInt = new SerdeIntSlice(); - - Slice keySlice = serdeString.serialize(key); - + StringSerde serdeString = new StringSerde(); + IntSerde serdeInt = new IntSerde(); + SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + serdeString.serialize(key, buffer); + Slice keySlice = buffer.toSlice(); byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray()); nextWindowId++; @@ -249,7 +251,7 @@ public class SpillableArrayListMultimapImplTest SpillableStateStore store = testMeta.store; SpillableArrayListMultimapImpl<String, String> map = - new SpillableArrayListMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); + new SpillableArrayListMultimapImpl<>(store, ID1, 0L, new StringSerde(), new StringSerde()); store.setup(testMeta.operatorContext); map.setup(testMeta.operatorContext); @@ -323,8 +325,10 @@ public class SpillableArrayListMultimapImplTest store.beginWindow(nextWindowId); map.beginWindow(nextWindowId); - SerdeStringSlice serdeString = new SerdeStringSlice(); - Slice keySlice = serdeString.serialize("a"); + StringSerde serdeString = new StringSerde(); + SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + serdeString.serialize("a", buffer); + Slice keySlice = buffer.toSlice(); byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray()); SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d", @@ -350,7 +354,7 @@ public class SpillableArrayListMultimapImplTest SpillableStateStore store = testMeta.store; SpillableArrayListMultimapImpl<String, String> multimap = new SpillableArrayListMultimapImpl<>( - this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); + this.testMeta.store, ID1, 0L, new StringSerde(), new StringSerde()); Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java index 5c477b1..29c2090 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java @@ -22,7 +22,7 @@ import org.junit.Rule; import org.junit.Test; import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; -import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.StringSerde; public class SpillableComplexComponentImplTest { @@ -48,9 +48,9 @@ public class SpillableComplexComponentImplTest SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(store); Spillable.SpillableComponent scList = - (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new SerdeStringSlice()); + (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new StringSerde()); Spillable.SpillableComponent scMap = - (Spillable.SpillableComponent)sccImpl.newSpillableMap(0L, new SerdeStringSlice(), new SerdeStringSlice()); + (Spillable.SpillableComponent)sccImpl.newSpillableMap(0L, new StringSerde(), new StringSerde()); sccImpl.setup(testMeta.operatorContext); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java index e8aea46..a96a8fd 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java @@ -23,7 +23,7 @@ import org.junit.Rule; import org.junit.Test; import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; -import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.StringSerde; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; @@ -47,6 +47,7 @@ public class SpillableMapImplTest simpleGetAndPutTestHelper(store); } + @Test public void simpleGetAndPutManagedStateTest() { @@ -55,11 +56,7 @@ public class SpillableMapImplTest private void simpleGetAndPutTestHelper(SpillableStateStore store) { - SerdeStringSlice sss = new SerdeStringSlice(); - - SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 0L, - new SerdeStringSlice(), - new SerdeStringSlice()); + SpillableMapImpl<String, String> map = createSpillableMap(store); store.setup(testMeta.operatorContext); map.setup(testMeta.operatorContext); @@ -157,23 +154,25 @@ public class SpillableMapImplTest public void simpleRemoveTest() { InMemSpillableStateStore store = new InMemSpillableStateStore(); - simpleRemoveTestHelper(store); } + @Test public void simpleRemoveManagedStateTest() { simpleRemoveTestHelper(testMeta.store); } - private void simpleRemoveTestHelper(SpillableStateStore store) + protected SpillableMapImpl<String, String> createSpillableMap(SpillableStateStore store) { - SerdeStringSlice sss = new SerdeStringSlice(); + return new SpillableMapImpl<String, String>(store, ID1, 0L, new StringSerde(), + new StringSerde()); + } - SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 0L, - new SerdeStringSlice(), - new SerdeStringSlice()); + private void simpleRemoveTestHelper(SpillableStateStore store) + { + SpillableMapImpl<String, String> map = createSpillableMap(store); store.setup(testMeta.operatorContext); map.setup(testMeta.operatorContext); @@ -324,14 +323,14 @@ public class SpillableMapImplTest public void multiMapPerBucketTestHelper(SpillableStateStore store) { - SerdeStringSlice sss = new SerdeStringSlice(); + StringSerde sss = new StringSerde(); SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(store, ID1, 0L, - new SerdeStringSlice(), - new SerdeStringSlice()); + new StringSerde(), + new StringSerde()); SpillableMapImpl<String, String> map2 = new SpillableMapImpl<>(store, ID2, 0L, - new SerdeStringSlice(), - new SerdeStringSlice()); + new StringSerde(), + new StringSerde()); store.setup(testMeta.operatorContext); map1.setup(testMeta.operatorContext); @@ -413,11 +412,11 @@ public class SpillableMapImplTest @Test public void recoveryWithManagedStateTest() throws Exception { - SerdeStringSlice sss = new SerdeStringSlice(); + StringSerde sss = new StringSerde(); SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(testMeta.store, ID1, 0L, - new SerdeStringSlice(), - new SerdeStringSlice()); + new StringSerde(), + new StringSerde()); testMeta.store.setup(testMeta.operatorContext); map1.setup(testMeta.operatorContext); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java index 3883191..d0343e1 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java @@ -26,7 +26,7 @@ import org.junit.Rule; import org.junit.Test; import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; -import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.StringSerde; import com.google.common.collect.Lists; @@ -53,7 +53,7 @@ public class SpillableSetImplTest public void simpleAddGetAndSetTest1Helper(SpillableStateStore store) { - SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new SerdeStringSlice()); + SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new StringSerde()); store.setup(testMeta.operatorContext); set.setup(testMeta.operatorContext); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java index 15970af..2f80628 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java @@ -27,7 +27,8 @@ import org.junit.Rule; import org.junit.Test; import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; -import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.StringSerde; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -62,8 +63,7 @@ public class SpillableSetMultimapImplTest public void simpleMultiKeyTestHelper(SpillableStateStore store) { SpillableSetMultimapImpl<String, String> map = - new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), - new SerdeStringSlice()); + new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde()); store.setup(testMeta.operatorContext); map.setup(testMeta.operatorContext); @@ -201,7 +201,7 @@ public class SpillableSetMultimapImplTest SpillableStateStore store = testMeta.store; SpillableSetMultimapImpl<String, String> map = - new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); + new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde()); store.setup(testMeta.operatorContext); map.setup(testMeta.operatorContext); @@ -276,8 +276,9 @@ public class SpillableSetMultimapImplTest final int numOfEntry = 100000; SpillableStateStore store = testMeta.store; - SpillableSetMultimapImpl<String, String> multimap = new SpillableSetMultimapImpl<>( - this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); + + SpillableSetMultimapImpl<String, String> multimap = new SpillableSetMultimapImpl<>(testMeta.store, ID1, 0L, + createStringSerde(), createStringSerde()); Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); @@ -294,4 +295,9 @@ public class SpillableSetMultimapImplTest multimap.endWindow(); store.endWindow(); } + + protected Serde<String> createStringSerde() + { + return new StringSerde(); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java index 36e3557..d72b1f9 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java @@ -27,14 +27,15 @@ import org.junit.runner.Description; import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils; import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.CollectionSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; -import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice; -import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; -import org.apache.apex.malhar.lib.utils.serde.SliceUtils; -import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer; +import org.apache.apex.malhar.lib.utils.serde.StringSerde; +import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream; + +import com.esotericsoftware.kryo.io.Input; import com.datatorrent.api.Context; -import com.datatorrent.lib.appdata.gpo.GPOUtils; import com.datatorrent.lib.fileaccess.FileAccessFSImpl; import com.datatorrent.lib.util.TestUtils; import com.datatorrent.netlet.util.Slice; @@ -44,9 +45,9 @@ import com.datatorrent.netlet.util.Slice; */ public class SpillableTestUtils { - public static SerdeStringSlice SERDE_STRING_SLICE = new SerdeStringSlice(); - public static SerdeCollectionSlice<String, List<String>> SERDE_STRING_LIST_SLICE = new SerdeCollectionSlice<>(new SerdeStringSlice(), - (Class<List<String>>)(Class)ArrayList.class); + public static StringSerde STRING_SERDE = new StringSerde(); + public static CollectionSerde<String, List<String>> STRING_LIST_SERDE = new CollectionSerde<>(new StringSerde(), + (Class)ArrayList.class); private SpillableTestUtils() { @@ -77,34 +78,41 @@ public class SpillableTestUtils } } + protected static SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + public static Slice getKeySlice(byte[] id, String key) { - return SliceUtils.concatenate(id, SERDE_STRING_SLICE.serialize(key)); + buffer.writeBytes(id); + STRING_SERDE.serialize(key, buffer); + return buffer.toSlice(); } public static Slice getKeySlice(byte[] id, int index, String key) { - return SliceUtils.concatenate(id, - SliceUtils.concatenate(GPOUtils.serializeInt(index), - SERDE_STRING_SLICE.serialize(key))); + buffer.writeBytes(id); + buffer.writeInt(index); + STRING_SERDE.serialize(key, buffer); + return buffer.toSlice(); } public static void checkValue(SpillableStateStore store, long bucketId, String key, byte[] prefix, String expectedValue) { - checkValue(store, bucketId, SliceUtils.concatenate(prefix, SERDE_STRING_SLICE.serialize(key)).buffer, - expectedValue, 0, SERDE_STRING_SLICE); + buffer.writeBytes(prefix); + STRING_SERDE.serialize(key, buffer); + checkValue(store, bucketId, buffer.toSlice().toByteArray(), expectedValue, 0, STRING_SERDE); } public static void checkValue(SpillableStateStore store, long bucketId, byte[] prefix, int index, List<String> expectedValue) { - checkValue(store, bucketId, SliceUtils.concatenate(prefix, GPOUtils.serializeInt(index)), expectedValue, 0, - SERDE_STRING_LIST_SLICE); + buffer.writeBytes(prefix); + buffer.writeInt(index); + checkValue(store, bucketId, buffer.toSlice().toByteArray(), expectedValue, 0, STRING_LIST_SERDE); } - public static <T> void checkValue(SpillableStateStore store, long bucketId, byte[] bytes, - T expectedValue, int offset, Serde<T, Slice> serde) + public static <T> void checkValue(SpillableStateStore store, long bucketId, byte[] bytes, + T expectedValue, int offset, Serde<T> serde) { Slice slice = store.getSync(bucketId, new Slice(bytes)); @@ -116,7 +124,7 @@ public class SpillableTestUtils } } - T string = serde.deserialize(slice, new MutableInt(offset)); + T string = serde.deserialize(new Input(slice.buffer, slice.offset + offset, slice.length)); Assert.assertEquals(expectedValue, string); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java index 8033a7d..a2cbb54 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java @@ -25,9 +25,6 @@ import org.junit.Test; import com.google.common.collect.Sets; -/** - * Created by tfarkas on 6/4/16. - */ public class TimeBasedPriorityQueueTest { @Test http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java new file mode 100644 index 0000000..007fab9 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.junit.Assert; +import org.junit.Test; + +import com.esotericsoftware.kryo.io.Input; + +import com.datatorrent.netlet.util.Slice; + +public class AffixSerdeTest +{ + @Test + public void simpleTest() + { + SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + AffixSerde<String> serde = new AffixSerde<>(new byte[]{1, 2, 3}, new StringSerde(), new byte[]{9}); + + final String orgValue = "abc"; + serde.serialize(orgValue, buffer); + Slice slice = buffer.toSlice(); + + String value = serde.deserialize(new Input(slice.buffer, slice.offset, slice.length)); + Assert.assertEquals(orgValue, value); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java new file mode 100644 index 0000000..3b39d6c --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; + +import com.datatorrent.netlet.util.Slice; + +public class BlockStreamTest +{ + protected Random random = new Random(); + + @Test + public void testWindowedBlockStream() + { + WindowedBlockStream bs = new WindowedBlockStream(); + List<byte[]> totalList = Lists.newArrayList(); + List<Slice> slices = Lists.newArrayList(); + + for (int windowId = 0; windowId < 10; ++windowId) { + List<byte[]> list = generateList(); + totalList.addAll(list); + + bs.beginWindow(windowId); + writeToBlockStream(bs, list, slices); + bs.endWindow(); + + if (windowId % 2 != 0) { + verify(totalList, slices); + + bs.completeWindow(windowId); + totalList.clear(); + slices.clear(); + } + } + } + + @Test + public void testBlockStream() + { + BlockStream bs = new BlockStream(); + List<byte[]> totalList = Lists.newArrayList(); + List<Slice> slices = Lists.newArrayList(); + + for (int tryTime = 0; tryTime < 10; ++tryTime) { + List<byte[]> list = generateList(); + totalList.addAll(list); + + writeToBlockStream(bs, list, slices); + + if (tryTime % 2 != 0) { + verify(totalList, slices); + + bs.reset(); + totalList.clear(); + slices.clear(); + } + + } + } + + private void writeToBlockStream(BlockStream bs, List<byte[]> list, List<Slice> slices) + { + for (byte[] bytes : list) { + int times = random.nextInt(100) + 1; + int remainLen = bytes.length; + int offset = 0; + while (times > 0 && remainLen > 0) { + int avgSubLen = remainLen / times; + times--; + if (avgSubLen == 0) { + bs.write(bytes, offset, remainLen); + break; + } + + int writeLen = remainLen; + if (times != 0) { + int subLen = random.nextInt(avgSubLen * 2); + writeLen = Math.min(subLen, remainLen); + } + bs.write(bytes, offset, writeLen); + + offset += writeLen; + remainLen -= writeLen; + } + slices.add(bs.toSlice()); + } + } + + private void verify(List<byte[]> list, List<Slice> slices) + { + //verify + Assert.assertTrue("size not equal.", list.size() == slices.size()); + + for (int i = 0; i < list.size(); ++i) { + byte[] bytes = list.get(i); + byte[] newBytes = slices.get(i).toByteArray(); + if (!Arrays.equals(bytes, newBytes)) { + Assert.assertArrayEquals(bytes, newBytes); + } + } + } + + private List<byte[]> generateList() + { + List<byte[]> list = Lists.newArrayList(); + int size = random.nextInt(10000) + 1; + for (int i = 0; i < size; i++) { + list.add(generateByteArray()); + } + return list; + } + + protected byte[] generateByteArray() + { + int len = random.nextInt(10000) + 1; + byte[] bytes = new byte[len]; + random.nextBytes(bytes); + return bytes; + } + + + @Test + public void testReleaseMemory() + { + WindowedBlockStream stream = new WindowedBlockStream(); + + byte[] data = new byte[2048]; + final int loopPerWindow = 100; + long windowId = 0; + + //fill data; + for (; windowId < 100; ++windowId) { + stream.beginWindow(windowId); + for (int i = 0; i < loopPerWindow; ++i) { + stream.write(data); + stream.toSlice(); + } + stream.endWindow(); + } + + long capacity = stream.capacity(); + stream.completeWindow(windowId); + Assert.assertTrue(capacity == stream.capacity()); + Assert.assertTrue(0 == stream.size()); + + //release memory; + for (; windowId < 200; ++windowId) { + stream.beginWindow(windowId); + stream.endWindow(); + } + + //at least keep one block as current block + Assert.assertTrue(stream.capacity() == Block.DEFAULT_BLOCK_SIZE); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java new file mode 100644 index 0000000..255d9c0 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.junit.Assert; +import org.junit.Test; + +import com.esotericsoftware.kryo.io.Input; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import com.datatorrent.netlet.util.Slice; + +public class CollectionSerdeTest +{ + @Test + public void testSerdeList() + { + CollectionSerde<String, List<String>> serdeList = + new CollectionSerde<>(new StringSerde(), (Class)ArrayList.class); + + List<String> stringList = Lists.newArrayList("a", "b", "c"); + SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + serdeList.serialize(stringList, buffer); + + Slice slice = buffer.toSlice(); + List<String> deserializedList = serdeList.deserialize(new Input(slice.buffer, slice.offset, slice.length)); + + Assert.assertEquals(stringList, deserializedList); + } + + @Test + public void testSerdeSet() + { + CollectionSerde<String, Set<String>> serdeSet = + new CollectionSerde<>(new StringSerde(), (Class)HashSet.class); + + Set<String> stringList = Sets.newHashSet("a", "b", "c"); + SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + serdeSet.serialize(stringList, buffer); + + Slice slice = buffer.toSlice(); + Set<String> deserializedSet = serdeSet.deserialize(new Input(slice.buffer, slice.offset, slice.length)); + + Assert.assertEquals(stringList, deserializedSet); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java new file mode 100644 index 0000000..34b7088 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import com.esotericsoftware.kryo.io.Input; +import com.google.common.collect.Lists; + +import com.datatorrent.netlet.util.Slice; + +/** + * SerdeKryoSlice unit tests + */ +public class GenericSerdeTest +{ + public static class TestPojo + { + private TestPojo() + { + } + + public TestPojo(int intValue, String stringValue) + { + this.intValue = intValue; + this.stringValue = stringValue; + } + + @Override + public boolean equals(Object other) + { + TestPojo o = (TestPojo)other; + return intValue == o.intValue && stringValue.equals(o.stringValue); + } + + int intValue; + String stringValue; + } + + @Test + public void stringListTest() + { + GenericSerde<ArrayList> serdeList = new GenericSerde<>(ArrayList.class); + + ArrayList<String> stringList = Lists.newArrayList("a", "b", "c"); + SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + serdeList.serialize(stringList, buffer); + Slice slice = buffer.toSlice(); + List<String> deserializedList = serdeList.deserialize(new Input(slice.buffer, slice.offset, slice.length)); + Assert.assertEquals(stringList, deserializedList); + } + + @Test + public void pojoTest() + { + GenericSerde<TestPojo> serdePojo = new GenericSerde<>(); + TestPojo pojo = new TestPojo(345, "xyz"); + SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + serdePojo.serialize(pojo, buffer); + Slice slice = buffer.toSlice(); + TestPojo deserializedPojo = serdePojo.deserialize(new Input(slice.buffer, slice.offset, slice.length)); + Assert.assertEquals(pojo, deserializedPojo); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java new file mode 100644 index 0000000..104ff04 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + +import com.esotericsoftware.kryo.io.Input; + +import com.datatorrent.netlet.util.Slice; + +public class PairSerdeTest +{ + @Test + public void simpleSerdeTest() + { + PairSerde<String, Integer> serdePair = new PairSerde<>(new StringSerde(), new IntSerde()); + + Pair<String, Integer> pair = new ImmutablePair<>("abc", 123); + + SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + serdePair.serialize(pair, buffer); + Slice slice = buffer.toSlice(); + + Pair<String, Integer> deserializedPair = serdePair.deserialize(new Input(slice.buffer, slice.offset, slice.length)); + + Assert.assertEquals(pair, deserializedPair); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java deleted file mode 100644 index 3cb5b65..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import org.apache.commons.lang3.mutable.MutableInt; - -public class PassThruByteArraySerdeTest -{ - @Rule - public SerdeByteArrayToByteArrayTestWatcher testMeta = new SerdeByteArrayToByteArrayTestWatcher(); - - public static class SerdeByteArrayToByteArrayTestWatcher extends TestWatcher - { - public PassThruByteArraySerde serde; - - @Override - protected void starting(Description description) - { - this.serde = new PassThruByteArraySerde(); - super.starting(description); - } - } - - @Test - public void simpleSerializeTest() - { - byte[] byteArray = new byte[]{1, 2, 3}; - byte[] serialized = testMeta.serde.serialize(byteArray); - - Assert.assertArrayEquals(byteArray, serialized); - } - - @Test - public void simpleDeserializeTest() - { - byte[] byteArray = new byte[]{1, 2, 3}; - byte[] serialized = testMeta.serde.deserialize(byteArray); - - Assert.assertArrayEquals(byteArray, serialized); - } - - @Test - public void simpleDeserializeOffsetTest() - { - byte[] byteArray = new byte[]{1, 2, 3}; - byte[] serialized = testMeta.serde.deserialize(byteArray, new MutableInt(0)); - - Assert.assertArrayEquals(byteArray, serialized); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java deleted file mode 100644 index f6085f6..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.junit.Assert; -import org.junit.Test; - -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import com.datatorrent.netlet.util.Slice; - -public class SerdeCollectionSliceTest -{ - @Test - public void testSerdeList() - { - SerdeCollectionSlice<String, List<String>> serdeList = - new SerdeCollectionSlice<>(new SerdeStringSlice(), (Class<List<String>>)(Class)ArrayList.class); - - List<String> stringList = Lists.newArrayList("a", "b", "c"); - - Slice slice = serdeList.serialize(stringList); - - List<String> deserializedList = serdeList.deserialize(slice); - - Assert.assertEquals(stringList, deserializedList); - } - - @Test - public void testSerdeSet() - { - SerdeCollectionSlice<String, Set<String>> serdeSet = - new SerdeCollectionSlice<>(new SerdeStringSlice(), (Class<Set<String>>)(Class)HashSet.class); - - Set<String> stringList = Sets.newHashSet("a", "b", "c"); - - Slice slice = serdeSet.serialize(stringList); - - Set<String> deserializedSet = serdeSet.deserialize(slice); - - Assert.assertEquals(stringList, deserializedSet); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java new file mode 100644 index 0000000..ee24557 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.esotericsoftware.kryo.io.Input; + +import com.datatorrent.netlet.util.Slice; + +public class SerdeGeneralTest +{ + private final int charNum = 62; + private String[] testData = null; + private final Random random = new Random(); + + @Before + public void generateTestData() + { + int size = random.nextInt(10000) + 1; + testData = new String[size]; + for (int i = 0; i < size; ++i) { + char[] chars = new char[random.nextInt(10000) + 1]; + for (int j = 0; j < chars.length; ++j) { + chars[j] = getRandomChar(); + } + + testData[i] = new String(chars); + } + } + + private char getRandomChar() + { + int value = random.nextInt(62); + if (value < 10) { + return (char)(value + '0'); + } else if (value < 36) { + return (char)(value + 'A'); + } + return (char)(value + 'a'); + } + + @Test + public void testSerdeInt() + { + IntSerde intSerde = new IntSerde(); + + SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + int value = 123; + intSerde.serialize(value, buffer); + + Slice slice = buffer.toSlice(); + + int deserializedValue = intSerde.deserialize(new Input(slice.buffer, slice.offset, slice.length)); + + Assert.assertEquals(value, deserializedValue); + } + + @Test + public void testSerdeString() + { + testSerde(testData, new StringSerde(), new StringSerdeVerifier()); + } + + @Test + public void testSerdeArray() + { + testSerde(testData, ArraySerde.newSerde(new StringSerde(), String.class), new StringArraySerdeVerifier()); + } + + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testSerdeCollection() + { + CollectionSerde<String, List<String>> listSerde = new CollectionSerde<>(new StringSerde(), (Class)ArrayList.class); + testSerde(testData, listSerde, new StringListSerdeVerifier()); + } + + + public <T> void testSerde(String[] strs, Serde<T> serde, SerdeVerifier<T> verifier) + { + SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream()); + + for (int i = 0; i < 10; ++i) { + buffer.beginWindow(i); + verifier.verifySerde(strs, serde, buffer); + buffer.endWindow(); + if (i % 3 == 0) { + buffer.completeWindow(i); + } + if (i % 4 == 0) { + buffer.reset(); + } + } + buffer.release(); + } + + public interface SerdeVerifier<T> + { + void verifySerde(String[] datas, Serde<T> serde, SerializationBuffer buffer); + } + + public static class StringSerdeVerifier implements SerdeVerifier<String> + { + @Override + public void verifySerde(String[] datas, Serde<String> serde, SerializationBuffer buffer) + { + for (String str : datas) { + serde.serialize(str, buffer); + Slice slice = buffer.toSlice(); + Assert.assertTrue("serialize failed, String: " + str, str.equals(serde.deserialize(new Input(slice.buffer, slice.offset, slice.length)))); + } + } + } + + + public static class StringArraySerdeVerifier implements SerdeVerifier<String[]> + { + @Override + public void verifySerde(String[] datas, Serde<String[]> serde, SerializationBuffer buffer) + { + serde.serialize(datas, buffer); + Slice slice = buffer.toSlice(); + String[] newStrs = serde.deserialize(new Input(slice.buffer, slice.offset, slice.length)); + Assert.assertArrayEquals("serialize array failed.", datas, newStrs); + } + } + + public static class StringListSerdeVerifier implements SerdeVerifier<List<String>> + { + @Override + public void verifySerde(String[] datas, Serde<List<String>> serdeList, SerializationBuffer buffer) + { + List<String> list = Arrays.asList(datas); + + serdeList.serialize(list, buffer); + Slice slice = buffer.toSlice(); + List<String> newStrs = serdeList.deserialize(new Input(slice.buffer, slice.offset, slice.length)); + Assert.assertArrayEquals("serialize list failed.", datas, newStrs.toArray(new String[0])); + + buffer.reset(); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java deleted file mode 100644 index b780f66..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import java.util.ArrayList; -import java.util.List; - -import org.junit.Assert; -import org.junit.Test; - -import com.google.common.collect.Lists; - -import com.datatorrent.netlet.util.Slice; - -/** - * SerdeKryoSlice unit tests - */ -public class SerdeKryoSliceTest -{ - public static class TestPojo - { - private TestPojo() - { - } - - public TestPojo(int intValue, String stringValue) - { - this.intValue = intValue; - this.stringValue = stringValue; - } - - @Override - public boolean equals(Object other) - { - TestPojo o = (TestPojo)other; - return intValue == o.intValue && stringValue.equals(o.stringValue); - } - - int intValue; - String stringValue; - } - - @Test - public void stringListTest() - { - SerdeKryoSlice<ArrayList> serdeList = new SerdeKryoSlice<>(ArrayList.class); - - ArrayList<String> stringList = Lists.newArrayList("a", "b", "c"); - Slice slice = serdeList.serialize(stringList); - List<String> deserializedList = serdeList.deserialize(slice); - Assert.assertEquals(stringList, deserializedList); - } - - @Test - public void pojoTest() - { - SerdeKryoSlice<TestPojo> serdePojo = new SerdeKryoSlice<>(); - TestPojo pojo = new TestPojo(345, "xyz"); - Slice slice = serdePojo.serialize(pojo); - TestPojo deserializedPojo = serdePojo.deserialize(slice); - Assert.assertEquals(pojo, deserializedPojo); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java deleted file mode 100644 index 6684a9f..0000000 --- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.utils.serde; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; - -import com.datatorrent.netlet.util.Slice; - -public class SerdePairSliceTest -{ - @Test - public void simpleSerdeTest() - { - SerdePairSlice<String, Integer> serdePair = new SerdePairSlice<>(new SerdeStringSlice(), new SerdeIntSlice()); - - Pair<String, Integer> pair = new ImmutablePair<>("abc", 123); - - Slice slice = serdePair.serialize(pair); - - Pair<String, Integer> deserializedPair = serdePair.deserialize(slice); - - Assert.assertEquals(pair, deserializedPair); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java index 3b7789c..a44e454 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java @@ -47,8 +47,14 @@ public class SpillableWindowedStorageTest Window window2 = new Window.TimeWindow<>(1010, 10); Window window3 = new Window.TimeWindow<>(1020, 10); storage.setSpillableComplexComponent(sccImpl); - storage.getSpillableComplexComponent().setup(testMeta.operatorContext); + + /* + * storage.setup() will create Spillable Data Structures + * storage.getSpillableComplexComponent().setup() will setup these Data Structures. + * So storage.setup() should be called before storage.getSpillableComplexComponent().setup() + */ storage.setup(testMeta.operatorContext); + storage.getSpillableComplexComponent().setup(testMeta.operatorContext); sccImpl.beginWindow(1000); storage.put(window1, 1); @@ -103,8 +109,15 @@ public class SpillableWindowedStorageTest Window window2 = new Window.TimeWindow<>(1010, 10); Window window3 = new Window.TimeWindow<>(1020, 10); storage.setSpillableComplexComponent(sccImpl); - storage.getSpillableComplexComponent().setup(testMeta.operatorContext); + + /* + * storage.setup() will create Spillable Data Structures + * storage.getSpillableComplexComponent().setup() will setup these Data Structures. + * So storage.setup() should be called before storage.getSpillableComplexComponent().setup() + */ storage.setup(testMeta.operatorContext); + storage.getSpillableComplexComponent().setup(testMeta.operatorContext); + sccImpl.beginWindow(1000); storage.put(window1, "x", 1);
