Repository: apex-malhar Updated Branches: refs/heads/master 170072533 -> 255bc11c5
APEXMALHAR-2197 #resolve #comment fix TimeBasedPriorityQueue.removeLRU throws NoSuchElementException Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/777d47d5 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/777d47d5 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/777d47d5 Branch: refs/heads/master Commit: 777d47d5bae8df78025f214af239293a0c7dae8c Parents: 9b6e11d Author: brightchen <[email protected]> Authored: Thu Aug 18 09:50:55 2016 -0700 Committer: brightchen <[email protected]> Committed: Mon Aug 22 15:58:50 2016 -0700 ---------------------------------------------------------------------- .../state/spillable/TimeBasedPriorityQueue.java | 13 +++++++-- .../SpillableByteArrayListMultimapImplTest.java | 30 ++++++++++++++++++++ 2 files changed, 41 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/777d47d5/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java index 025c501..f28e0b2 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java @@ -115,8 +115,17 @@ public class TimeBasedPriorityQueue<T> } else if (this.timestamp > timeWrapper.getTimestamp()) { return 1; } - - return 0; + + /** + * NOTE: the following use the equals() to implement the compareTo() for key. + * it should be OK as the compareTo() only used by TimeBasedPriorityQueue.sortedTimestamp, + * which only care about the order of time ( the order for key doesn't matter ). + * But would cause problem if add other function which depended on the order of the key. + * + * Add compare by hashCode when not equals in order to compatible with the interface for most cases. + * Anyway, the order of key is not guaranteed. And we should not return 0 if not equals + */ + return key.equals(timeWrapper.key) ? 0 : (hashCode() - timeWrapper.hashCode() <= 0 ? -1 : 1); } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/777d47d5/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java index 42d7d20..81063b8 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java @@ -19,6 +19,7 @@ package org.apache.apex.malhar.lib.state.spillable; import java.util.List; +import java.util.Random; import org.junit.Assert; import org.junit.Rule; @@ -338,4 +339,33 @@ public class SpillableByteArrayListMultimapImplTest map.teardown(); store.teardown(); } + + @Test + public void testLoad() + { + Random random = new Random(); + final int keySize = 1000000; + final int valueSize = 100000000; + final int numOfEntry = 100000; + + SpillableStateStore store = testMeta.store; + + SpillableByteArrayListMultimapImpl<String, String> multimap = new SpillableByteArrayListMultimapImpl<>( + this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); + Context.OperatorContext context = + new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); + store.setup(context); + multimap.setup(context); + + store.beginWindow(1); + multimap.beginWindow(1); + for (int i = 0; i < numOfEntry; ++i) { + multimap.put(String.valueOf(random.nextInt(keySize)), String.valueOf(random.nextInt(valueSize))); + } + multimap.endWindow(); + store.endWindow(); + } }
