Repository: apex-malhar
Updated Branches:
  refs/heads/master 170072533 -> 255bc11c5


APEXMALHAR-2197 #resolve #comment fix TimeBasedPriorityQueue.removeLRU throws 
NoSuchElementException


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/777d47d5
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/777d47d5
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/777d47d5

Branch: refs/heads/master
Commit: 777d47d5bae8df78025f214af239293a0c7dae8c
Parents: 9b6e11d
Author: brightchen <[email protected]>
Authored: Thu Aug 18 09:50:55 2016 -0700
Committer: brightchen <[email protected]>
Committed: Mon Aug 22 15:58:50 2016 -0700

----------------------------------------------------------------------
 .../state/spillable/TimeBasedPriorityQueue.java | 13 +++++++--
 .../SpillableByteArrayListMultimapImplTest.java | 30 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/777d47d5/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
index 025c501..f28e0b2 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
@@ -115,8 +115,17 @@ public class TimeBasedPriorityQueue<T>
       } else if (this.timestamp > timeWrapper.getTimestamp()) {
         return 1;
       }
-
-      return 0;
+      
+      /**
+       * NOTE: the following use the equals() to implement the compareTo() for 
key.
+       * it should be OK as the compareTo() only used by 
TimeBasedPriorityQueue.sortedTimestamp, 
+       * which only care about the order of time ( the order for key doesn't 
matter ).
+       * But would cause problem if add other function which depended on the 
order of the key.
+       * 
+       * Add compare by hashCode when not equals in order to compatible with 
the interface for most cases.
+       * Anyway, the order of key is not guaranteed. And we should not return 
0 if not equals
+       */
+      return key.equals(timeWrapper.key) ? 0 : (hashCode() - 
timeWrapper.hashCode() <= 0 ? -1 : 1);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/777d47d5/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
index 42d7d20..81063b8 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
@@ -19,6 +19,7 @@
 package org.apache.apex.malhar.lib.state.spillable;
 
 import java.util.List;
+import java.util.Random;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -338,4 +339,33 @@ public class SpillableByteArrayListMultimapImplTest
     map.teardown();
     store.teardown();
   }
+  
+  @Test
+  public void testLoad()
+  {
+    Random random = new Random();
+    final int keySize = 1000000;
+    final int valueSize = 100000000;
+    final int numOfEntry = 100000;
+
+    SpillableStateStore store = testMeta.store;
+    
+    SpillableByteArrayListMultimapImpl<String, String> multimap = new 
SpillableByteArrayListMultimapImpl<>(
+        this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new 
SerdeStringSlice());
+    
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+    Context.OperatorContext context =
+        new 
OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(),
 attributes);
+    store.setup(context);
+    multimap.setup(context);
+
+    store.beginWindow(1);
+    multimap.beginWindow(1);
+    for (int i = 0; i < numOfEntry; ++i) {
+      multimap.put(String.valueOf(random.nextInt(keySize)), 
String.valueOf(random.nextInt(valueSize)));
+    }
+    multimap.endWindow();
+    store.endWindow();
+  }
 }

Reply via email to