Repository: apex-malhar
Updated Branches:
  refs/heads/master 37991576d -> 6ddefd02a


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
new file mode 100644
index 0000000..fa4cd73
--- /dev/null
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.WindowListener;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+
+/**
+ * This is a stream which manages blocks and supports window related 
operations.
+ *
+ */
+public class WindowedBlockStream extends BlockStream implements 
WindowListener, WindowCompleteListener
+{
+  private static final Logger logger = 
LoggerFactory.getLogger(WindowedBlockStream.class);
+  /**
+   * Map from windowId to blockIds
+   */
+  protected SetMultimap<Long, Integer> windowToBlockIds = 
HashMultimap.create();
+
+  /**
+   * set of all free blockIds.
+   */
+  protected Set<Integer> freeBlockIds = Sets.newHashSet();
+
+  // max block index; must be >= 0
+  protected int maxBlockIndex = 0;
+
+  protected long currentWindowId;
+
+  /**
+   * This lock is used for adding/removing block(s)
+   */
+  protected transient ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  protected BlockReleaseStrategy releaseStrategy = new 
DefaultBlockReleaseStrategy();
+
+  public WindowedBlockStream()
+  {
+    super();
+  }
+
+  public WindowedBlockStream(int blockCapacity)
+  {
+    super(blockCapacity);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+    moveToNextWindow();
+  }
+
+  /**
+   * make sure different windows will not share any blocks. Move to next block 
if
+   * current block is already used.
+   */
+  protected void moveToNextWindow()
+  {
+    //use current block if it hasn't be used, else, move to next block
+    Block block = getOrCreateCurrentBlock();
+    if (!block.isClear()) {
+      throw new RuntimeException("Current block not clear, should NOT move to 
next window. Please call toSlice() to output data first");
+    }
+    if (block.size() > 0) {
+      moveToNextBlock();
+    }
+    windowToBlockIds.put(currentWindowId, currentBlockIndex);
+  }
+
+  /**
+   * This method tries to use a free block first. Allocate a new block if there
+   * are no free blocks
+   *
+   * @return The previous block
+   */
+  @Override
+  protected Block moveToNextBlock()
+  {
+    lock.writeLock().lock();
+    try {
+      Block previousBlock = currentBlock;
+      if (!freeBlockIds.isEmpty()) {
+        currentBlockIndex = freeBlockIds.iterator().next();
+        freeBlockIds.remove(currentBlockIndex);
+        currentBlock = this.blocks.get(currentBlockIndex);
+      } else {
+        currentBlockIndex = ++maxBlockIndex;
+        currentBlock = getOrCreateCurrentBlock();
+      }
+      windowToBlockIds.put(currentWindowId, currentBlockIndex);
+      return previousBlock;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    releaseMemory();
+  }
+
+  @Override
+  public void completeWindow(long windowId)
+  {
+    lock.writeLock().lock();
+    try {
+      Set<Long> windIds = Sets.newHashSet(windowToBlockIds.keySet());
+      for (long windId : windIds) {
+        if (windId <= windowId) {
+          resetWindow(windId);
+        }
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  protected void resetWindow(long windowId)
+  {
+    lock.writeLock().lock();
+    try {
+      Set<Integer> removedBlockIds = windowToBlockIds.removeAll(windowId);
+
+      int removedSize = 0;
+      for (int blockId : removedBlockIds) {
+        removedSize += blocks.get(blockId).size();
+        Block theBlock = blocks.get(blockId);
+        theBlock.reset();
+        if (theBlock == currentBlock) {
+          //the client code could ask reset up to current window
+          //but the reset block should not be current block. current block 
should be reassigned.
+          moveToNextBlock();
+        }
+        logger.debug("reset block: {}, currentBlock: {}", blockId, theBlock);
+      }
+
+      freeBlockIds.addAll(removedBlockIds);
+      size -= removedSize;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void reset()
+  {
+    lock.writeLock().lock();
+    try {
+      super.reset();
+
+      //all blocks are free now except the current one
+      freeBlockIds.addAll(blocks.keySet());
+      freeBlockIds.remove(currentBlockIndex);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * The size of the data of all windows with id less than or equals to 
windowId
+   * @param windowId
+   * @return
+   */
+  public long dataSizeUpToWindow(long windowId)
+  {
+    lock.readLock().lock();
+    try {
+      long totalSize = 0;
+      for (long winId : windowToBlockIds.keySet()) {
+        totalSize += dataSizeOfWindow(winId);
+      }
+      return totalSize;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  protected long dataSizeOfWindow(long windowId)
+  {
+    lock.readLock().lock();
+    try {
+      long sizeOfWindow = 0;
+      Set<Integer> blockIds = windowToBlockIds.get(windowId);
+      if (blockIds != null) {
+        for (int blockId : blockIds) {
+          sizeOfWindow += blocks.get(blockId).size();
+        }
+      }
+      return sizeOfWindow;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  public void releaseMemory()
+  {
+    /**
+     * report and release extra blocks
+     */
+    releaseStrategy.currentFreeBlocks(freeBlockIds.size());
+    int releasingBlocks = Math.min(releaseStrategy.getNumBlocksToRelease(), 
freeBlockIds.size());
+    int releasedBlocks = 0;
+    Iterator<Integer> iter = freeBlockIds.iterator();
+    while (releasedBlocks < releasingBlocks) {
+      //release blocks
+      int blockId = iter.next();
+      iter.remove();
+      blocks.remove(blockId);
+      releasedBlocks++;
+    }
+
+    /**
+     * report number of released blocks
+     */
+    if (releasedBlocks > 0) {
+      releaseStrategy.releasedBlocks(releasedBlocks);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
index da44fb1..b88501e 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
@@ -34,7 +34,6 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * Spillable session windowed storage.
@@ -53,7 +52,7 @@ public class SpillableSessionWindowedStorage<K, V> extends 
SpillableWindowedKeye
     if (keyToWindowsMap == null) {
       // NOTE: this will pose difficulties when we try to assign the entries 
to a time bucket later on.
       // This is logged in APEXMALHAR-2271
-      keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, 
(Serde<Window.SessionWindow<K>, Slice>)(Serde)windowSerde);
+      keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, 
(Serde<Window.SessionWindow<K>>)(Serde)windowSerde);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
index ac386ab..ef111b3 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
@@ -27,15 +27,14 @@ import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.state.spillable.Spillable;
 import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
 import org.apache.apex.malhar.lib.window.Window;
 import org.apache.apex.malhar.lib.window.WindowedStorage;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * Implementation of WindowedKeyedStorage using {@link Spillable} data 
structures
@@ -48,10 +47,10 @@ public class SpillableWindowedKeyedStorage<K, V> implements 
WindowedStorage.Wind
   @NotNull
   protected SpillableComplexComponent scc;
   protected long bucket;
-  protected Serde<Window, Slice> windowSerde;
-  protected Serde<Pair<Window, K>, Slice> windowKeyPairSerde;
-  protected Serde<K, Slice> keySerde;
-  protected Serde<V, Slice> valueSerde;
+  protected Serde<Window> windowSerde;
+  protected Serde<Pair<Window, K>> windowKeyPairSerde;
+  protected Serde<K> keySerde;
+  protected Serde<V> valueSerde;
 
   protected Spillable.SpillableMap<Pair<Window, K>, V> windowKeyToValueMap;
   protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap;
@@ -96,7 +95,7 @@ public class SpillableWindowedKeyedStorage<K, V> implements 
WindowedStorage.Wind
   }
 
   public SpillableWindowedKeyedStorage(long bucket,
-      Serde<Window, Slice> windowSerde, Serde<Pair<Window, K>, Slice> 
windowKeyPairSerde, Serde<K, Slice> keySerde, Serde<V, Slice> valueSerde)
+      Serde<Window> windowSerde, Serde<Pair<Window, K>> windowKeyPairSerde, 
Serde<K> keySerde, Serde<V> valueSerde)
   {
     this.bucket = bucket;
     this.windowSerde = windowSerde;
@@ -120,17 +119,17 @@ public class SpillableWindowedKeyedStorage<K, V> 
implements WindowedStorage.Wind
     this.bucket = bucket;
   }
 
-  public void setWindowSerde(Serde<Window, Slice> windowSerde)
+  public void setWindowSerde(Serde<Window> windowSerde)
   {
     this.windowSerde = windowSerde;
   }
 
-  public void setWindowKeyPairSerde(Serde<Pair<Window, K>, Slice> 
windowKeyPairSerde)
+  public void setWindowKeyPairSerde(Serde<Pair<Window, K>> windowKeyPairSerde)
   {
     this.windowKeyPairSerde = windowKeyPairSerde;
   }
 
-  public void setValueSerde(Serde<V, Slice> valueSerde)
+  public void setValueSerde(Serde<V> valueSerde)
   {
     this.valueSerde = valueSerde;
   }
@@ -168,16 +167,16 @@ public class SpillableWindowedKeyedStorage<K, V> 
implements WindowedStorage.Wind
     }
     // set default serdes
     if (windowSerde == null) {
-      windowSerde = new SerdeKryoSlice<>();
+      windowSerde = new GenericSerde<>();
     }
     if (windowKeyPairSerde == null) {
-      windowKeyPairSerde = new SerdeKryoSlice<>();
+      windowKeyPairSerde = new GenericSerde<>();
     }
     if (keySerde == null) {
-      keySerde = new SerdeKryoSlice<>();
+      keySerde = new GenericSerde<>();
     }
     if (valueSerde == null) {
-      valueSerde = new SerdeKryoSlice<>();
+      valueSerde = new GenericSerde<>();
     }
 
     if (windowKeyToValueMap == null) {
@@ -220,5 +219,4 @@ public class SpillableWindowedKeyedStorage<K, V> implements 
WindowedStorage.Wind
   {
     return windowKeyToValueMap.get(new ImmutablePair<>(window, key));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
index 6666381..9a8a291 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
@@ -24,13 +24,12 @@ import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.state.spillable.Spillable;
 import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
 import org.apache.apex.malhar.lib.window.Window;
 import org.apache.apex.malhar.lib.window.WindowedStorage;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * This is an implementation of WindowedPlainStorage that makes use of {@link 
Spillable} data structures
@@ -42,8 +41,8 @@ public class SpillableWindowedPlainStorage<T> implements 
WindowedStorage.Windowe
   @NotNull
   private SpillableComplexComponent scc;
   private long bucket;
-  private Serde<Window, Slice> windowSerde;
-  private Serde<T, Slice> valueSerde;
+  private Serde<Window> windowSerde;
+  private Serde<T> valueSerde;
 
   protected Spillable.SpillableMap<Window, T> windowToDataMap;
 
@@ -51,7 +50,7 @@ public class SpillableWindowedPlainStorage<T> implements 
WindowedStorage.Windowe
   {
   }
 
-  public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> 
windowSerde, Serde<T, Slice> valueSerde)
+  public SpillableWindowedPlainStorage(long bucket, Serde<Window> windowSerde, 
Serde<T> valueSerde)
   {
     this.bucket = bucket;
     this.windowSerde = windowSerde;
@@ -73,12 +72,12 @@ public class SpillableWindowedPlainStorage<T> implements 
WindowedStorage.Windowe
     this.bucket = bucket;
   }
 
-  public void setWindowSerde(Serde<Window, Slice> windowSerde)
+  public void setWindowSerde(Serde<Window> windowSerde)
   {
     this.windowSerde = windowSerde;
   }
 
-  public void setValueSerde(Serde<T, Slice> valueSerde)
+  public void setValueSerde(Serde<T> valueSerde)
   {
     this.valueSerde = valueSerde;
   }
@@ -128,10 +127,10 @@ public class SpillableWindowedPlainStorage<T> implements 
WindowedStorage.Windowe
     }
     // set default serdes
     if (windowSerde == null) {
-      windowSerde = new SerdeKryoSlice<>();
+      windowSerde = new GenericSerde<>();
     }
     if (valueSerde == null) {
-      valueSerde = new SerdeKryoSlice<>();
+      valueSerde = new GenericSerde<>();
     }
     if (windowToDataMap == null) {
       windowToDataMap = scc.newSpillableMap(bucket, windowSerde, valueSerde);
@@ -142,5 +141,4 @@ public class SpillableWindowedPlainStorage<T> implements 
WindowedStorage.Windowe
   public void teardown()
   {
   }
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java 
b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
index 403072d..92937a9 100644
--- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
+++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
 import org.apache.commons.io.FileUtils;
 
 import com.google.common.base.Preconditions;
@@ -57,7 +58,7 @@ public class TestUtils
 
   public static Slice getSlice(int val)
   {
-    return new Slice(getBytes(val));
+    return new BufferSlice(getBytes(val));
   }
 
   public static class TestInfo extends TestWatcher

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
index 2058b69..6645a98 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -28,6 +28,12 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.apex.malhar.lib.state.managed.Bucket.ReadSource;
+import org.apache.apex.malhar.lib.utils.serde.AffixSerde;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
+
 import com.google.common.primitives.Longs;
 
 import com.datatorrent.lib.fileaccess.FileAccess;
@@ -82,6 +88,7 @@ public class DefaultBucketTest
     Assert.assertNull("value not present", value);
 
     Assert.assertEquals("size of bucket", one.length * 2 + Longs.BYTES, 
testMeta.defaultBucket.getSizeInBytes());
+
     testMeta.defaultBucket.teardown();
   }
 
@@ -126,7 +133,6 @@ public class DefaultBucketTest
     Slice one = ManagedStateTestUtils.getSliceFor("1");
     testPut();
     Map<Slice, Bucket.BucketedValue> unsaved = 
testMeta.defaultBucket.checkpoint(10);
-    Assert.assertEquals("size", 1, unsaved.size());
 
     Map.Entry<Slice, Bucket.BucketedValue> entry = 
unsaved.entrySet().iterator().next();
     Assert.assertEquals("key", one, entry.getKey());
@@ -192,15 +198,39 @@ public class DefaultBucketTest
     testGetFromReader();
     long initSize = testMeta.defaultBucket.getSizeInBytes();
 
-    Slice two = ManagedStateTestUtils.getSliceFor("2");
-    testMeta.defaultBucket.put(two, 101, two);
-
-    Assert.assertEquals("size", initSize + (two.length * 2 + Longs.BYTES ), 
testMeta.defaultBucket.getSizeInBytes());
+    //The temporary memory generated by get was not managed by bucket, only 
put was managed by bucket
+    SerializationBuffer buffer = new 
SerializationBuffer(testMeta.defaultBucket.getKeyStream());
+    byte[] keyPrefix = new byte[]{0};
+    String key = "1";
+    String value = "2";
+    AffixSerde<String> keySerde = new AffixSerde<>(keyPrefix, new 
StringSerde(), null);
+
+    StringSerde valueSerde = new StringSerde();
+
+    testMeta.defaultBucket.getKeyStream().beginWindow(1);
+    testMeta.defaultBucket.getValueStream().beginWindow(1);
+    keySerde.serialize(key, buffer);
+    Slice keySlice = buffer.toSlice();
+    valueSerde.serialize(value, buffer);
+    Slice valueSlice = buffer.toSlice();
+    testMeta.defaultBucket.put(keySlice, 1, valueSlice);
+    testMeta.defaultBucket.getKeyStream().endWindow();
+    testMeta.defaultBucket.getValueStream().endWindow();
+
+    long currentSize = testMeta.defaultBucket.getSizeInBytes();
+    testMeta.defaultBucket.freeMemory(Long.MAX_VALUE);
+    //call this method to invoke the release memory
+    testMeta.defaultBucket.get(keySlice, -1, ReadSource.MEMORY);
+    long sizeFreed = currentSize - testMeta.defaultBucket.getSizeInBytes();
+
+    SerializationBuffer tmpBuffer = new SerializationBuffer(new 
WindowedBlockStream());
+    tmpBuffer.writeBytes(keyPrefix);
+    tmpBuffer.writeString(key);
+    tmpBuffer.writeString(value);
+    int expectedFreedSize = tmpBuffer.toSlice().toByteArray().length; //key 
prefix, key length, key; value length, value
+    Assert.assertEquals("size freed", expectedFreedSize, sizeFreed);
+    Assert.assertEquals("existing size", currentSize - expectedFreedSize, 
testMeta.defaultBucket.getSizeInBytes());
 
-    long sizeFreed = testMeta.defaultBucket.freeMemory(Long.MAX_VALUE);
-    Assert.assertEquals("size freed", initSize, sizeFreed);
-    Assert.assertEquals("existing size", (two.length * 2 + Longs.BYTES), 
testMeta.defaultBucket.getSizeInBytes());
     testMeta.defaultBucket.teardown();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
index 0d3f87a..86f8430 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
@@ -28,6 +28,7 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.RemoteIterator;
 
@@ -128,6 +129,6 @@ public class ManagedStateTestUtils
 
   public static Slice getSliceFor(String x)
   {
-    return new Slice(x.getBytes());
+    return new BufferSlice(x.getBytes());
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
index af05c88..5dd6404 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
@@ -23,7 +23,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 import com.google.common.collect.Lists;
 
@@ -58,7 +58,7 @@ public class SpillableArrayListImplTest
   public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
   {
     SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, 
ID1, store,
-        new SerdeStringSlice(), 1);
+        new StringSerde(), 1);
 
     store.setup(testMeta.operatorContext);
     list.setup(testMeta.operatorContext);
@@ -177,7 +177,7 @@ public class SpillableArrayListImplTest
   private void simpleAddGetAndSetTest3Helper(SpillableStateStore store)
   {
     SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, 
ID1, store,
-        new SerdeStringSlice(), 3);
+        new StringSerde(), 3);
 
     store.setup(testMeta.operatorContext);
     list.setup(testMeta.operatorContext);
@@ -321,10 +321,10 @@ public class SpillableArrayListImplTest
   public void simpleMultiListTestHelper(SpillableStateStore store)
   {
     SpillableArrayListImpl<String> list1 = new SpillableArrayListImpl<>(0L, 
ID1, store,
-        new SerdeStringSlice(), 1);
+        new StringSerde(), 1);
 
     SpillableArrayListImpl<String> list2 = new SpillableArrayListImpl<>(0L, 
ID2, store,
-        new SerdeStringSlice(), 1);
+        new StringSerde(), 1);
 
     store.setup(testMeta.operatorContext);
     list1.setup(testMeta.operatorContext);
@@ -483,7 +483,7 @@ public class SpillableArrayListImplTest
     SpillableStateStore store = testMeta.store;
 
     SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, 
ID1, store,
-        new SerdeStringSlice(), 3);
+        new StringSerde(), 3);
 
     store.setup(testMeta.operatorContext);
     list.setup(testMeta.operatorContext);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
index 82fb340..d21bf50 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
@@ -26,9 +26,11 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
 import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
 
 import com.google.common.collect.Lists;
 
@@ -63,8 +65,8 @@ public class SpillableArrayListMultimapImplTest
   public void simpleMultiKeyTestHelper(SpillableStateStore store)
   {
     SpillableArrayListMultimapImpl<String, String> map =
-        new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new 
SerdeStringSlice(),
-        new SerdeStringSlice());
+        new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new 
StringSerde(),
+        new StringSerde());
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -112,11 +114,11 @@ public class SpillableArrayListMultimapImplTest
   public long simpleMultiKeyTestHelper(SpillableStateStore store,
       SpillableArrayListMultimapImpl<String, String> map, String key, long 
nextWindowId)
   {
-    SerdeStringSlice serdeString = new SerdeStringSlice();
-    SerdeIntSlice serdeInt = new SerdeIntSlice();
-
-    Slice keySlice = serdeString.serialize(key);
-
+    StringSerde serdeString = new StringSerde();
+    IntSerde serdeInt = new IntSerde();
+    SerializationBuffer buffer = new SerializationBuffer(new 
WindowedBlockStream());
+    serdeString.serialize(key, buffer);
+    Slice keySlice = buffer.toSlice();
     byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
 
     nextWindowId++;
@@ -249,7 +251,7 @@ public class SpillableArrayListMultimapImplTest
     SpillableStateStore store = testMeta.store;
 
     SpillableArrayListMultimapImpl<String, String> map =
-        new SpillableArrayListMultimapImpl<>(store, ID1, 0L, new 
SerdeStringSlice(), new SerdeStringSlice());
+        new SpillableArrayListMultimapImpl<>(store, ID1, 0L, new 
StringSerde(), new StringSerde());
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -323,8 +325,10 @@ public class SpillableArrayListMultimapImplTest
     store.beginWindow(nextWindowId);
     map.beginWindow(nextWindowId);
 
-    SerdeStringSlice serdeString = new SerdeStringSlice();
-    Slice keySlice = serdeString.serialize("a");
+    StringSerde serdeString = new StringSerde();
+    SerializationBuffer buffer = new SerializationBuffer(new 
WindowedBlockStream());
+    serdeString.serialize("a", buffer);
+    Slice keySlice = buffer.toSlice();
     byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
 
     SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, 
Lists.<String>newArrayList("a", "111", "b", "222", "d",
@@ -350,7 +354,7 @@ public class SpillableArrayListMultimapImplTest
 
     SpillableStateStore store = testMeta.store;
     SpillableArrayListMultimapImpl<String, String> multimap = new 
SpillableArrayListMultimapImpl<>(
-        this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new 
SerdeStringSlice());
+        this.testMeta.store, ID1, 0L, new StringSerde(), new StringSerde());
 
     Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
     attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
index 5c477b1..29c2090 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
@@ -22,7 +22,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 public class SpillableComplexComponentImplTest
 {
@@ -48,9 +48,9 @@ public class SpillableComplexComponentImplTest
     SpillableComplexComponentImpl sccImpl = new 
SpillableComplexComponentImpl(store);
 
     Spillable.SpillableComponent scList =
-        (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new 
SerdeStringSlice());
+        (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new 
StringSerde());
     Spillable.SpillableComponent scMap =
-        (Spillable.SpillableComponent)sccImpl.newSpillableMap(0L, new 
SerdeStringSlice(), new SerdeStringSlice());
+        (Spillable.SpillableComponent)sccImpl.newSpillableMap(0L, new 
StringSerde(), new StringSerde());
 
     sccImpl.setup(testMeta.operatorContext);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
index e8aea46..a96a8fd 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
@@ -23,7 +23,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
@@ -47,6 +47,7 @@ public class SpillableMapImplTest
     simpleGetAndPutTestHelper(store);
   }
 
+
   @Test
   public void simpleGetAndPutManagedStateTest()
   {
@@ -55,11 +56,7 @@ public class SpillableMapImplTest
 
   private void simpleGetAndPutTestHelper(SpillableStateStore store)
   {
-    SerdeStringSlice sss = new SerdeStringSlice();
-
-    SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 
0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
+    SpillableMapImpl<String, String> map = createSpillableMap(store);
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -157,23 +154,25 @@ public class SpillableMapImplTest
   public void simpleRemoveTest()
   {
     InMemSpillableStateStore store = new InMemSpillableStateStore();
-
     simpleRemoveTestHelper(store);
   }
 
+
   @Test
   public void simpleRemoveManagedStateTest()
   {
     simpleRemoveTestHelper(testMeta.store);
   }
 
-  private void simpleRemoveTestHelper(SpillableStateStore store)
+  protected SpillableMapImpl<String, String> 
createSpillableMap(SpillableStateStore store)
   {
-    SerdeStringSlice sss = new SerdeStringSlice();
+    return new SpillableMapImpl<String, String>(store, ID1, 0L, new 
StringSerde(),
+        new StringSerde());
+  }
 
-    SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 
0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
+  private void simpleRemoveTestHelper(SpillableStateStore store)
+  {
+    SpillableMapImpl<String, String> map = createSpillableMap(store);
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -324,14 +323,14 @@ public class SpillableMapImplTest
 
   public void multiMapPerBucketTestHelper(SpillableStateStore store)
   {
-    SerdeStringSlice sss = new SerdeStringSlice();
+    StringSerde sss = new StringSerde();
 
     SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(store, ID1, 
0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
+        new StringSerde(),
+        new StringSerde());
     SpillableMapImpl<String, String> map2 = new SpillableMapImpl<>(store, ID2, 
0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
+        new StringSerde(),
+        new StringSerde());
 
     store.setup(testMeta.operatorContext);
     map1.setup(testMeta.operatorContext);
@@ -413,11 +412,11 @@ public class SpillableMapImplTest
   @Test
   public void recoveryWithManagedStateTest() throws Exception
   {
-    SerdeStringSlice sss = new SerdeStringSlice();
+    StringSerde sss = new StringSerde();
 
     SpillableMapImpl<String, String> map1 = new 
SpillableMapImpl<>(testMeta.store, ID1, 0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
+        new StringSerde(),
+        new StringSerde());
 
     testMeta.store.setup(testMeta.operatorContext);
     map1.setup(testMeta.operatorContext);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
index 3883191..d0343e1 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
@@ -26,7 +26,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 import com.google.common.collect.Lists;
 
@@ -53,7 +53,7 @@ public class SpillableSetImplTest
 
   public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
   {
-    SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new 
SerdeStringSlice());
+    SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new 
StringSerde());
 
     store.setup(testMeta.operatorContext);
     set.setup(testMeta.operatorContext);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
index 15970af..2f80628 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
@@ -27,7 +27,8 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -62,8 +63,7 @@ public class SpillableSetMultimapImplTest
   public void simpleMultiKeyTestHelper(SpillableStateStore store)
   {
     SpillableSetMultimapImpl<String, String> map =
-        new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(),
-        new SerdeStringSlice());
+        new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), 
createStringSerde());
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -201,7 +201,7 @@ public class SpillableSetMultimapImplTest
     SpillableStateStore store = testMeta.store;
 
     SpillableSetMultimapImpl<String, String> map =
-        new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), 
new SerdeStringSlice());
+        new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), 
createStringSerde());
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -276,8 +276,9 @@ public class SpillableSetMultimapImplTest
     final int numOfEntry = 100000;
 
     SpillableStateStore store = testMeta.store;
-    SpillableSetMultimapImpl<String, String> multimap = new 
SpillableSetMultimapImpl<>(
-        this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new 
SerdeStringSlice());
+
+    SpillableSetMultimapImpl<String, String> multimap = new 
SpillableSetMultimapImpl<>(testMeta.store, ID1, 0L,
+        createStringSerde(), createStringSerde());
 
     Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
     attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
@@ -294,4 +295,9 @@ public class SpillableSetMultimapImplTest
     multimap.endWindow();
     store.endWindow();
   }
+
+  protected Serde<String> createStringSerde()
+  {
+    return new StringSerde();
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
index 36e3557..d72b1f9 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
@@ -27,14 +27,15 @@ import org.junit.runner.Description;
 
 import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils;
 import 
org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.CollectionSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
+
+import com.esotericsoftware.kryo.io.Input;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
 import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
 import com.datatorrent.lib.util.TestUtils;
 import com.datatorrent.netlet.util.Slice;
@@ -44,9 +45,9 @@ import com.datatorrent.netlet.util.Slice;
  */
 public class SpillableTestUtils
 {
-  public static SerdeStringSlice SERDE_STRING_SLICE = new SerdeStringSlice();
-  public static SerdeCollectionSlice<String, List<String>> 
SERDE_STRING_LIST_SLICE = new SerdeCollectionSlice<>(new SerdeStringSlice(),
-      (Class<List<String>>)(Class)ArrayList.class);
+  public static StringSerde STRING_SERDE = new StringSerde();
+  public static CollectionSerde<String, List<String>> STRING_LIST_SERDE = new 
CollectionSerde<>(new StringSerde(),
+      (Class)ArrayList.class);
 
   private SpillableTestUtils()
   {
@@ -77,34 +78,41 @@ public class SpillableTestUtils
     }
   }
 
+  protected static SerializationBuffer buffer = new SerializationBuffer(new 
WindowedBlockStream());
+
   public static Slice getKeySlice(byte[] id, String key)
   {
-    return SliceUtils.concatenate(id, SERDE_STRING_SLICE.serialize(key));
+    buffer.writeBytes(id);
+    STRING_SERDE.serialize(key, buffer);
+    return buffer.toSlice();
   }
 
   public static Slice getKeySlice(byte[] id, int index, String key)
   {
-    return SliceUtils.concatenate(id,
-        SliceUtils.concatenate(GPOUtils.serializeInt(index),
-            SERDE_STRING_SLICE.serialize(key)));
+    buffer.writeBytes(id);
+    buffer.writeInt(index);
+    STRING_SERDE.serialize(key, buffer);
+    return buffer.toSlice();
   }
 
   public static void checkValue(SpillableStateStore store, long bucketId, 
String key,
       byte[] prefix, String expectedValue)
   {
-    checkValue(store, bucketId, SliceUtils.concatenate(prefix, 
SERDE_STRING_SLICE.serialize(key)).buffer,
-        expectedValue, 0, SERDE_STRING_SLICE);
+    buffer.writeBytes(prefix);
+    STRING_SERDE.serialize(key, buffer);
+    checkValue(store, bucketId, buffer.toSlice().toByteArray(), expectedValue, 
0, STRING_SERDE);
   }
 
   public static void checkValue(SpillableStateStore store, long bucketId,
       byte[] prefix, int index, List<String> expectedValue)
   {
-    checkValue(store, bucketId, SliceUtils.concatenate(prefix, 
GPOUtils.serializeInt(index)), expectedValue, 0,
-        SERDE_STRING_LIST_SLICE);
+    buffer.writeBytes(prefix);
+    buffer.writeInt(index);
+    checkValue(store, bucketId, buffer.toSlice().toByteArray(), expectedValue, 
0, STRING_LIST_SERDE);
   }
 
-  public static <T> void  checkValue(SpillableStateStore store, long bucketId, 
byte[] bytes,
-      T expectedValue, int offset, Serde<T, Slice> serde)
+  public static <T> void checkValue(SpillableStateStore store, long bucketId, 
byte[] bytes,
+      T expectedValue, int offset, Serde<T> serde)
   {
     Slice slice = store.getSync(bucketId, new Slice(bytes));
 
@@ -116,7 +124,7 @@ public class SpillableTestUtils
       }
     }
 
-    T string = serde.deserialize(slice, new MutableInt(offset));
+    T string = serde.deserialize(new Input(slice.buffer, slice.offset + 
offset, slice.length));
 
     Assert.assertEquals(expectedValue, string);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
index 8033a7d..a2cbb54 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
@@ -25,9 +25,6 @@ import org.junit.Test;
 
 import com.google.common.collect.Sets;
 
-/**
- * Created by tfarkas on 6/4/16.
- */
 public class TimeBasedPriorityQueueTest
 {
   @Test

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java
new file mode 100644
index 0000000..007fab9
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class AffixSerdeTest
+{
+  @Test
+  public void simpleTest()
+  {
+    SerializationBuffer buffer = new SerializationBuffer(new 
WindowedBlockStream());
+    AffixSerde<String> serde = new AffixSerde<>(new byte[]{1, 2, 3}, new 
StringSerde(), new byte[]{9});
+
+    final String orgValue = "abc";
+    serde.serialize(orgValue, buffer);
+    Slice slice = buffer.toSlice();
+
+    String value = serde.deserialize(new Input(slice.buffer, slice.offset, 
slice.length));
+    Assert.assertEquals(orgValue, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java
new file mode 100644
index 0000000..3b39d6c
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class BlockStreamTest
+{
+  protected Random random = new Random();
+
+  @Test
+  public void testWindowedBlockStream()
+  {
+    WindowedBlockStream bs = new WindowedBlockStream();
+    List<byte[]> totalList = Lists.newArrayList();
+    List<Slice> slices = Lists.newArrayList();
+
+    for (int windowId = 0; windowId < 10; ++windowId) {
+      List<byte[]> list = generateList();
+      totalList.addAll(list);
+
+      bs.beginWindow(windowId);
+      writeToBlockStream(bs, list, slices);
+      bs.endWindow();
+
+      if (windowId % 2 != 0) {
+        verify(totalList, slices);
+
+        bs.completeWindow(windowId);
+        totalList.clear();
+        slices.clear();
+      }
+    }
+  }
+
+  @Test
+  public void testBlockStream()
+  {
+    BlockStream bs = new BlockStream();
+    List<byte[]> totalList = Lists.newArrayList();
+    List<Slice> slices = Lists.newArrayList();
+
+    for (int tryTime = 0; tryTime < 10; ++tryTime) {
+      List<byte[]> list = generateList();
+      totalList.addAll(list);
+
+      writeToBlockStream(bs, list, slices);
+
+      if (tryTime % 2 != 0) {
+        verify(totalList, slices);
+
+        bs.reset();
+        totalList.clear();
+        slices.clear();
+      }
+
+    }
+  }
+
+  private void writeToBlockStream(BlockStream bs, List<byte[]> list, 
List<Slice> slices)
+  {
+    for (byte[] bytes : list) {
+      int times = random.nextInt(100) + 1;
+      int remainLen = bytes.length;
+      int offset = 0;
+      while (times > 0 && remainLen > 0) {
+        int avgSubLen = remainLen / times;
+        times--;
+        if (avgSubLen == 0) {
+          bs.write(bytes, offset, remainLen);
+          break;
+        }
+
+        int writeLen = remainLen;
+        if (times != 0) {
+          int subLen = random.nextInt(avgSubLen * 2);
+          writeLen = Math.min(subLen, remainLen);
+        }
+        bs.write(bytes, offset, writeLen);
+
+        offset += writeLen;
+        remainLen -= writeLen;
+      }
+      slices.add(bs.toSlice());
+    }
+  }
+
+  private void verify(List<byte[]> list, List<Slice> slices)
+  {
+    //verify
+    Assert.assertTrue("size not equal.", list.size() == slices.size());
+
+    for (int i = 0; i < list.size(); ++i) {
+      byte[] bytes = list.get(i);
+      byte[] newBytes = slices.get(i).toByteArray();
+      if (!Arrays.equals(bytes, newBytes)) {
+        Assert.assertArrayEquals(bytes, newBytes);
+      }
+    }
+  }
+
+  private List<byte[]> generateList()
+  {
+    List<byte[]> list = Lists.newArrayList();
+    int size = random.nextInt(10000) + 1;
+    for (int i = 0; i < size; i++) {
+      list.add(generateByteArray());
+    }
+    return list;
+  }
+
+  protected byte[] generateByteArray()
+  {
+    int len = random.nextInt(10000) + 1;
+    byte[] bytes = new byte[len];
+    random.nextBytes(bytes);
+    return bytes;
+  }
+
+
+  @Test
+  public void testReleaseMemory()
+  {
+    WindowedBlockStream stream = new WindowedBlockStream();
+
+    byte[] data = new byte[2048];
+    final int loopPerWindow = 100;
+    long windowId = 0;
+
+    //fill data;
+    for (; windowId < 100; ++windowId) {
+      stream.beginWindow(windowId);
+      for (int i = 0; i < loopPerWindow; ++i) {
+        stream.write(data);
+        stream.toSlice();
+      }
+      stream.endWindow();
+    }
+
+    long capacity = stream.capacity();
+    stream.completeWindow(windowId);
+    Assert.assertTrue(capacity == stream.capacity());
+    Assert.assertTrue(0 == stream.size());
+
+    //release memory;
+    for (; windowId < 200; ++windowId) {
+      stream.beginWindow(windowId);
+      stream.endWindow();
+    }
+
+    //at least keep one block as current block
+    Assert.assertTrue(stream.capacity() == Block.DEFAULT_BLOCK_SIZE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java
new file mode 100644
index 0000000..255d9c0
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class CollectionSerdeTest
+{
+  @Test
+  public void testSerdeList()
+  {
+    CollectionSerde<String, List<String>> serdeList =
+        new CollectionSerde<>(new StringSerde(), (Class)ArrayList.class);
+
+    List<String> stringList = Lists.newArrayList("a", "b", "c");
+    SerializationBuffer buffer = new SerializationBuffer(new 
WindowedBlockStream());
+    serdeList.serialize(stringList, buffer);
+
+    Slice slice = buffer.toSlice();
+    List<String> deserializedList = serdeList.deserialize(new 
Input(slice.buffer, slice.offset, slice.length));
+
+    Assert.assertEquals(stringList, deserializedList);
+  }
+
+  @Test
+  public void testSerdeSet()
+  {
+    CollectionSerde<String, Set<String>> serdeSet =
+        new CollectionSerde<>(new StringSerde(), (Class)HashSet.class);
+
+    Set<String> stringList = Sets.newHashSet("a", "b", "c");
+    SerializationBuffer buffer = new SerializationBuffer(new 
WindowedBlockStream());
+    serdeSet.serialize(stringList, buffer);
+
+    Slice slice = buffer.toSlice();
+    Set<String> deserializedSet = serdeSet.deserialize(new Input(slice.buffer, 
slice.offset, slice.length));
+
+    Assert.assertEquals(stringList, deserializedSet);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
new file mode 100644
index 0000000..34b7088
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * SerdeKryoSlice unit tests
+ */
+public class GenericSerdeTest
+{
+  public static class TestPojo
+  {
+    private TestPojo()
+    {
+    }
+
+    public TestPojo(int intValue, String stringValue)
+    {
+      this.intValue = intValue;
+      this.stringValue = stringValue;
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+      TestPojo o = (TestPojo)other;
+      return intValue == o.intValue && stringValue.equals(o.stringValue);
+    }
+
+    int intValue;
+    String stringValue;
+  }
+
+  @Test
+  public void stringListTest()
+  {
+    GenericSerde<ArrayList> serdeList = new GenericSerde<>(ArrayList.class);
+
+    ArrayList<String> stringList = Lists.newArrayList("a", "b", "c");
+    SerializationBuffer buffer = new SerializationBuffer(new 
WindowedBlockStream());
+    serdeList.serialize(stringList, buffer);
+    Slice slice = buffer.toSlice();
+    List<String> deserializedList = serdeList.deserialize(new 
Input(slice.buffer, slice.offset, slice.length));
+    Assert.assertEquals(stringList, deserializedList);
+  }
+
+  @Test
+  public void pojoTest()
+  {
+    GenericSerde<TestPojo> serdePojo = new GenericSerde<>();
+    TestPojo pojo = new TestPojo(345, "xyz");
+    SerializationBuffer buffer = new SerializationBuffer(new 
WindowedBlockStream());
+    serdePojo.serialize(pojo, buffer);
+    Slice slice = buffer.toSlice();
+    TestPojo deserializedPojo = serdePojo.deserialize(new Input(slice.buffer, 
slice.offset, slice.length));
+    Assert.assertEquals(pojo, deserializedPojo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java
new file mode 100644
index 0000000..104ff04
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class PairSerdeTest
+{
+  @Test
+  public void simpleSerdeTest()
+  {
+    PairSerde<String, Integer> serdePair = new PairSerde<>(new StringSerde(), 
new IntSerde());
+
+    Pair<String, Integer> pair = new ImmutablePair<>("abc", 123);
+
+    SerializationBuffer buffer = new SerializationBuffer(new 
WindowedBlockStream());
+    serdePair.serialize(pair, buffer);
+    Slice slice = buffer.toSlice();
+
+    Pair<String, Integer> deserializedPair = serdePair.deserialize(new 
Input(slice.buffer, slice.offset, slice.length));
+
+    Assert.assertEquals(pair, deserializedPair);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java
deleted file mode 100644
index 3cb5b65..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-
-public class PassThruByteArraySerdeTest
-{
-  @Rule
-  public SerdeByteArrayToByteArrayTestWatcher testMeta = new 
SerdeByteArrayToByteArrayTestWatcher();
-
-  public static class SerdeByteArrayToByteArrayTestWatcher extends TestWatcher
-  {
-    public PassThruByteArraySerde serde;
-
-    @Override
-    protected void starting(Description description)
-    {
-      this.serde = new PassThruByteArraySerde();
-      super.starting(description);
-    }
-  }
-
-  @Test
-  public void simpleSerializeTest()
-  {
-    byte[] byteArray = new byte[]{1, 2, 3};
-    byte[] serialized = testMeta.serde.serialize(byteArray);
-
-    Assert.assertArrayEquals(byteArray, serialized);
-  }
-
-  @Test
-  public void simpleDeserializeTest()
-  {
-    byte[] byteArray = new byte[]{1, 2, 3};
-    byte[] serialized = testMeta.serde.deserialize(byteArray);
-
-    Assert.assertArrayEquals(byteArray, serialized);
-  }
-
-  @Test
-  public void simpleDeserializeOffsetTest()
-  {
-    byte[] byteArray = new byte[]{1, 2, 3};
-    byte[] serialized = testMeta.serde.deserialize(byteArray, new 
MutableInt(0));
-
-    Assert.assertArrayEquals(byteArray, serialized);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
deleted file mode 100644
index f6085f6..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import com.datatorrent.netlet.util.Slice;
-
-public class SerdeCollectionSliceTest
-{
-  @Test
-  public void testSerdeList()
-  {
-    SerdeCollectionSlice<String, List<String>> serdeList =
-        new SerdeCollectionSlice<>(new SerdeStringSlice(), 
(Class<List<String>>)(Class)ArrayList.class);
-
-    List<String> stringList = Lists.newArrayList("a", "b", "c");
-
-    Slice slice = serdeList.serialize(stringList);
-
-    List<String> deserializedList = serdeList.deserialize(slice);
-
-    Assert.assertEquals(stringList, deserializedList);
-  }
-
-  @Test
-  public void testSerdeSet()
-  {
-    SerdeCollectionSlice<String, Set<String>> serdeSet =
-        new SerdeCollectionSlice<>(new SerdeStringSlice(), 
(Class<Set<String>>)(Class)HashSet.class);
-
-    Set<String> stringList = Sets.newHashSet("a", "b", "c");
-
-    Slice slice = serdeSet.serialize(stringList);
-
-    Set<String> deserializedSet = serdeSet.deserialize(slice);
-
-    Assert.assertEquals(stringList, deserializedSet);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java
new file mode 100644
index 0000000..ee24557
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SerdeGeneralTest
+{
+  private final int charNum = 62;
+  private String[] testData = null;
+  private final Random random = new Random();
+
+  @Before
+  public void generateTestData()
+  {
+    int size = random.nextInt(10000) + 1;
+    testData = new String[size];
+    for (int i = 0; i < size; ++i) {
+      char[] chars = new char[random.nextInt(10000) + 1];
+      for (int j = 0; j < chars.length; ++j) {
+        chars[j] = getRandomChar();
+      }
+
+      testData[i] = new String(chars);
+    }
+  }
+
+  private char getRandomChar()
+  {
+    int value = random.nextInt(62);
+    if (value < 10) {
+      return (char)(value + '0');
+    } else if (value < 36) {
+      return (char)(value + 'A');
+    }
+    return (char)(value + 'a');
+  }
+
+  @Test
+  public void testSerdeInt()
+  {
+    IntSerde intSerde = new IntSerde();
+
+    SerializationBuffer buffer = new SerializationBuffer(new 
WindowedBlockStream());
+    int value = 123;
+    intSerde.serialize(value, buffer);
+
+    Slice slice = buffer.toSlice();
+
+    int deserializedValue = intSerde.deserialize(new Input(slice.buffer, 
slice.offset, slice.length));
+
+    Assert.assertEquals(value, deserializedValue);
+  }
+
+  @Test
+  public void testSerdeString()
+  {
+    testSerde(testData, new StringSerde(), new StringSerdeVerifier());
+  }
+
+  @Test
+  public void testSerdeArray()
+  {
+    testSerde(testData, ArraySerde.newSerde(new StringSerde(), String.class), 
new StringArraySerdeVerifier());
+  }
+
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testSerdeCollection()
+  {
+    CollectionSerde<String, List<String>> listSerde = new 
CollectionSerde<>(new StringSerde(), (Class)ArrayList.class);
+    testSerde(testData, listSerde, new StringListSerdeVerifier());
+  }
+
+
+  public <T> void testSerde(String[] strs, Serde<T> serde, SerdeVerifier<T> 
verifier)
+  {
+    SerializationBuffer buffer = new SerializationBuffer(new 
WindowedBlockStream());
+
+    for (int i = 0; i < 10; ++i) {
+      buffer.beginWindow(i);
+      verifier.verifySerde(strs, serde, buffer);
+      buffer.endWindow();
+      if (i % 3 == 0) {
+        buffer.completeWindow(i);
+      }
+      if (i % 4 == 0) {
+        buffer.reset();
+      }
+    }
+    buffer.release();
+  }
+
+  public interface SerdeVerifier<T>
+  {
+    void verifySerde(String[] datas, Serde<T> serde, SerializationBuffer 
buffer);
+  }
+
+  public static class StringSerdeVerifier implements SerdeVerifier<String>
+  {
+    @Override
+    public void verifySerde(String[] datas, Serde<String> serde, 
SerializationBuffer buffer)
+    {
+      for (String str : datas) {
+        serde.serialize(str, buffer);
+        Slice slice = buffer.toSlice();
+        Assert.assertTrue("serialize failed, String: " + str, 
str.equals(serde.deserialize(new Input(slice.buffer, slice.offset, 
slice.length))));
+      }
+    }
+  }
+
+
+  public static class StringArraySerdeVerifier implements 
SerdeVerifier<String[]>
+  {
+    @Override
+    public void verifySerde(String[] datas, Serde<String[]> serde, 
SerializationBuffer buffer)
+    {
+      serde.serialize(datas, buffer);
+      Slice slice = buffer.toSlice();
+      String[] newStrs = serde.deserialize(new Input(slice.buffer, 
slice.offset, slice.length));
+      Assert.assertArrayEquals("serialize array failed.", datas, newStrs);
+    }
+  }
+
+  public static class StringListSerdeVerifier implements 
SerdeVerifier<List<String>>
+  {
+    @Override
+    public void verifySerde(String[] datas, Serde<List<String>> serdeList, 
SerializationBuffer buffer)
+    {
+      List<String> list = Arrays.asList(datas);
+
+      serdeList.serialize(list, buffer);
+      Slice slice = buffer.toSlice();
+      List<String> newStrs = serdeList.deserialize(new Input(slice.buffer, 
slice.offset, slice.length));
+      Assert.assertArrayEquals("serialize list failed.", datas, 
newStrs.toArray(new String[0]));
+
+      buffer.reset();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
deleted file mode 100644
index b780f66..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * SerdeKryoSlice unit tests
- */
-public class SerdeKryoSliceTest
-{
-  public static class TestPojo
-  {
-    private TestPojo()
-    {
-    }
-
-    public TestPojo(int intValue, String stringValue)
-    {
-      this.intValue = intValue;
-      this.stringValue = stringValue;
-    }
-
-    @Override
-    public boolean equals(Object other)
-    {
-      TestPojo o = (TestPojo)other;
-      return intValue == o.intValue && stringValue.equals(o.stringValue);
-    }
-
-    int intValue;
-    String stringValue;
-  }
-
-  @Test
-  public void stringListTest()
-  {
-    SerdeKryoSlice<ArrayList> serdeList = new 
SerdeKryoSlice<>(ArrayList.class);
-
-    ArrayList<String> stringList = Lists.newArrayList("a", "b", "c");
-    Slice slice = serdeList.serialize(stringList);
-    List<String> deserializedList = serdeList.deserialize(slice);
-    Assert.assertEquals(stringList, deserializedList);
-  }
-
-  @Test
-  public void pojoTest()
-  {
-    SerdeKryoSlice<TestPojo> serdePojo = new SerdeKryoSlice<>();
-    TestPojo pojo = new TestPojo(345, "xyz");
-    Slice slice = serdePojo.serialize(pojo);
-    TestPojo deserializedPojo = serdePojo.deserialize(slice);
-    Assert.assertEquals(pojo, deserializedPojo);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
deleted file mode 100644
index 6684a9f..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
-import com.datatorrent.netlet.util.Slice;
-
-public class SerdePairSliceTest
-{
-  @Test
-  public void simpleSerdeTest()
-  {
-    SerdePairSlice<String, Integer> serdePair = new SerdePairSlice<>(new 
SerdeStringSlice(), new SerdeIntSlice());
-
-    Pair<String, Integer> pair = new ImmutablePair<>("abc", 123);
-
-    Slice slice = serdePair.serialize(pair);
-
-    Pair<String, Integer> deserializedPair = serdePair.deserialize(slice);
-
-    Assert.assertEquals(pair, deserializedPair);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
index 3b7789c..a44e454 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
@@ -47,8 +47,14 @@ public class SpillableWindowedStorageTest
     Window window2 = new Window.TimeWindow<>(1010, 10);
     Window window3 = new Window.TimeWindow<>(1020, 10);
     storage.setSpillableComplexComponent(sccImpl);
-    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+
+    /*
+     * storage.setup() will create Spillable Data Structures
+     * storage.getSpillableComplexComponent().setup() will setup these Data 
Structures.
+     * So storage.setup() should be called before 
storage.getSpillableComplexComponent().setup()
+     */
     storage.setup(testMeta.operatorContext);
+    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
 
     sccImpl.beginWindow(1000);
     storage.put(window1, 1);
@@ -103,8 +109,15 @@ public class SpillableWindowedStorageTest
     Window window2 = new Window.TimeWindow<>(1010, 10);
     Window window3 = new Window.TimeWindow<>(1020, 10);
     storage.setSpillableComplexComponent(sccImpl);
-    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+
+    /*
+     * storage.setup() will create Spillable Data Structures
+     * storage.getSpillableComplexComponent().setup() will setup these Data 
Structures.
+     * So storage.setup() should be called before 
storage.getSpillableComplexComponent().setup()
+     */
     storage.setup(testMeta.operatorContext);
+    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+
 
     sccImpl.beginWindow(1000);
     storage.put(window1, "x", 1);

Reply via email to