Repository: apex-malhar
Updated Branches:
  refs/heads/master 36582fff2 -> e082133a1


APEXMALHAR-2246 #resolve use Slice instead of byte[] in the underlying map of 
SpillableByteArrayListMultimapImpl


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e082133a
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e082133a
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e082133a

Branch: refs/heads/master
Commit: e082133a14c48d90274b130fc7a42e655c515979
Parents: 36582ff
Author: David Yan <da...@datatorrent.com>
Authored: Mon Sep 19 17:17:57 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue Sep 20 10:42:16 2016 -0700

----------------------------------------------------------------------
 .../SpillableByteArrayListMultimapImpl.java     | 31 +++++++++++++++-----
 .../SpillableByteArrayListMultimapImplTest.java |  8 ++---
 2 files changed, 28 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e082133a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
index 5c91350..c0466bd 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
@@ -26,7 +26,7 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 
-import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
 import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
 import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
@@ -60,7 +60,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> 
implements Spillable.Spill
 
   private int batchSize = DEFAULT_BATCH_SIZE;
   @NotNull
-  private SpillableByteMapImpl<byte[], Integer> map;
+  private SpillableByteMapImpl<Slice, Integer> map;
   private SpillableStateStore store;
   private byte[] identifier;
   private long bucket;
@@ -91,7 +91,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> 
implements Spillable.Spill
     this.serdeKey = Preconditions.checkNotNull(serdeKey);
     this.serdeValue = Preconditions.checkNotNull(serdeValue);
 
-    map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+    map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruSliceSerde(), new SerdeIntSlice());
   }
 
   public SpillableStateStore getStore()
@@ -111,7 +111,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> 
implements Spillable.Spill
 
     if (spillableArrayList == null) {
       Slice keySlice = serdeKey.serialize(key);
-      Integer size = map.get(SliceUtils.concatenate(keySlice, 
SIZE_KEY_SUFFIX).toByteArray());
+      Integer size = map.get(SliceUtils.concatenate(keySlice, 
SIZE_KEY_SUFFIX));
 
       if (size == null) {
         return null;
@@ -166,6 +166,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> 
implements Spillable.Spill
   @Override
   public int size()
   {
+    // TODO: This is actually wrong since in a Multimap, size() should return 
the number of entries, not the number of distinct keys
     return map.size();
   }
 
@@ -179,7 +180,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> 
implements Spillable.Spill
   public boolean containsKey(@Nullable Object key)
   {
     return cache.contains((K)key) || 
map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
-        SIZE_KEY_SUFFIX).toByteArray());
+        SIZE_KEY_SUFFIX));
   }
 
   @Override
@@ -191,7 +192,23 @@ public class SpillableByteArrayListMultimapImpl<K, V> 
implements Spillable.Spill
   @Override
   public boolean containsEntry(@Nullable Object key, @Nullable Object value)
   {
-    throw new UnsupportedOperationException();
+    SpillableArrayListImpl<V> spillableArrayList = getHelper((K)key);
+    if (spillableArrayList == null) {
+      return false;
+    }
+    for (int i = 0; i < spillableArrayList.size(); i++) {
+      V v = spillableArrayList.get(i);
+      if (v == null) {
+        if (value == null) {
+          return true;
+        }
+      } else {
+        if (v.equals(value)) {
+          return true;
+        }
+      }
+    }
+    return false;
   }
 
   @Override
@@ -275,7 +292,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> 
implements Spillable.Spill
       SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
       spillableArrayList.endWindow();
 
-      Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), 
SIZE_KEY_SUFFIX).toByteArray(),
+      Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), 
SIZE_KEY_SUFFIX),
           spillableArrayList.size());
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e082133a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
index 81063b8..2c9d7eb 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
@@ -239,7 +239,6 @@ public class SpillableByteArrayListMultimapImplTest
 
     map.endWindow();
     store.endWindow();
-    store.beforeCheckpoint(nextWindowId);
 
     return nextWindowId;
   }
@@ -258,12 +257,13 @@ public class SpillableByteArrayListMultimapImplTest
     long nextWindowId = 0L;
     nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
     long activationWindow = nextWindowId;
-    nextWindowId++;
-
+    store.beforeCheckpoint(nextWindowId);
     SpillableByteArrayListMultimapImpl<String, String> clonedMap = 
KryoCloneUtils.cloneObject(map);
     store.checkpointed(nextWindowId);
     store.committed(nextWindowId);
 
+    nextWindowId++;
+
     store.beginWindow(nextWindowId);
     map.beginWindow(nextWindowId);
 
@@ -319,7 +319,7 @@ public class SpillableByteArrayListMultimapImplTest
 
     store.setup(context);
     map.setup(context);
-
+    nextWindowId = activationWindow + 1;
     store.beginWindow(nextWindowId);
     map.beginWindow(nextWindowId);
 

Reply via email to