Repository: apex-malhar Updated Branches: refs/heads/master 36582fff2 -> e082133a1
APEXMALHAR-2246 #resolve use Slice instead of byte[] in the underlying map of SpillableByteArrayListMultimapImpl Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e082133a Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e082133a Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e082133a Branch: refs/heads/master Commit: e082133a14c48d90274b130fc7a42e655c515979 Parents: 36582ff Author: David Yan <[email protected]> Authored: Mon Sep 19 17:17:57 2016 -0700 Committer: David Yan <[email protected]> Committed: Tue Sep 20 10:42:16 2016 -0700 ---------------------------------------------------------------------- .../SpillableByteArrayListMultimapImpl.java | 31 +++++++++++++++----- .../SpillableByteArrayListMultimapImplTest.java | 8 ++--- 2 files changed, 28 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e082133a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java index 5c91350..c0466bd 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java @@ -26,7 +26,7 @@ import java.util.Set; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; -import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; import org.apache.apex.malhar.lib.utils.serde.SliceUtils; @@ -60,7 +60,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.Spill private int batchSize = DEFAULT_BATCH_SIZE; @NotNull - private SpillableByteMapImpl<byte[], Integer> map; + private SpillableByteMapImpl<Slice, Integer> map; private SpillableStateStore store; private byte[] identifier; private long bucket; @@ -91,7 +91,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.Spill this.serdeKey = Preconditions.checkNotNull(serdeKey); this.serdeValue = Preconditions.checkNotNull(serdeValue); - map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice()); } public SpillableStateStore getStore() @@ -111,7 +111,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.Spill if (spillableArrayList == null) { Slice keySlice = serdeKey.serialize(key); - Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray()); + Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX)); if (size == null) { return null; @@ -166,6 +166,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.Spill @Override public int size() { + // TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys return map.size(); } @@ -179,7 +180,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.Spill public boolean containsKey(@Nullable Object key) { return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), - SIZE_KEY_SUFFIX).toByteArray()); + SIZE_KEY_SUFFIX)); } @Override @@ -191,7 +192,23 @@ public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.Spill @Override public boolean containsEntry(@Nullable Object key, @Nullable Object value) { - throw new UnsupportedOperationException(); + SpillableArrayListImpl<V> spillableArrayList = getHelper((K)key); + if (spillableArrayList == null) { + return false; + } + for (int i = 0; i < spillableArrayList.size(); i++) { + V v = spillableArrayList.get(i); + if (v == null) { + if (value == null) { + return true; + } + } else { + if (v.equals(value)) { + return true; + } + } + } + return false; } @Override @@ -275,7 +292,7 @@ public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.Spill SpillableArrayListImpl<V> spillableArrayList = cache.get(key); spillableArrayList.endWindow(); - Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX).toByteArray(), + Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX), spillableArrayList.size()); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e082133a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java index 81063b8..2c9d7eb 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java @@ -239,7 +239,6 @@ public class SpillableByteArrayListMultimapImplTest map.endWindow(); store.endWindow(); - store.beforeCheckpoint(nextWindowId); return nextWindowId; } @@ -258,12 +257,13 @@ public class SpillableByteArrayListMultimapImplTest long nextWindowId = 0L; nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId); long activationWindow = nextWindowId; - nextWindowId++; - + store.beforeCheckpoint(nextWindowId); SpillableByteArrayListMultimapImpl<String, String> clonedMap = KryoCloneUtils.cloneObject(map); store.checkpointed(nextWindowId); store.committed(nextWindowId); + nextWindowId++; + store.beginWindow(nextWindowId); map.beginWindow(nextWindowId); @@ -319,7 +319,7 @@ public class SpillableByteArrayListMultimapImplTest store.setup(context); map.setup(context); - + nextWindowId = activationWindow + 1; store.beginWindow(nextWindowId); map.beginWindow(nextWindowId);
