Repository: apex-malhar
Updated Branches:
  refs/heads/master 58201fbf9 -> baff632ae


APEXMALHAR-2276 over writing the value of a key in the same time bucket


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5a0d1ebc
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5a0d1ebc
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5a0d1ebc

Branch: refs/heads/master
Commit: 5a0d1ebc07900811cbe57aa7f2867d9e4574d4cf
Parents: 139d89f
Author: Chandni Singh <[email protected]>
Authored: Fri Sep 30 13:01:09 2016 -0700
Committer: Chandni Singh <[email protected]>
Committed: Fri Sep 30 14:24:15 2016 -0700

----------------------------------------------------------------------
 .../apex/malhar/lib/state/managed/Bucket.java    |  5 ++++-
 .../managed/ManagedTimeUnifiedStateImplTest.java | 19 +++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5a0d1ebc/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 0ed1865..74364a9 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -389,12 +389,15 @@ public interface Bucket extends ManagedStateComponent
         sizeInBytes.getAndAdd(key.length);
         sizeInBytes.getAndAdd(Long.SIZE);
       }
-      if (timeBucket > bucketedValue.getTimeBucket()) {
+      if (timeBucket >= bucketedValue.getTimeBucket()) {
 
         int inc = null == bucketedValue.getValue() ? value.length : 
value.length - bucketedValue.getValue().length;
         sizeInBytes.getAndAdd(inc);
         bucketedValue.setTimeBucket(timeBucket);
         bucketedValue.setValue(value);
+      } else {
+        LOG.warn("ignoring {} {} {}; existing {} {}", key, value, timeBucket,
+            bucketedValue.getValue(), bucketedValue.getTimeBucket());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5a0d1ebc/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index 6808b63..25596fa 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -88,6 +88,25 @@ public class ManagedTimeUnifiedStateImplTest
   }
 
   @Test
+  public void testPutWithMultipleValuesForAKey()
+  {
+    Slice one = ManagedStateTestUtils.getSliceFor("1");
+
+    testMeta.managedState.setup(testMeta.operatorContext);
+    long time = System.currentTimeMillis();
+    testMeta.managedState.beginWindow(0);
+    testMeta.managedState.put(time, one, one);
+
+    Slice two = ManagedStateTestUtils.getSliceFor("2");
+    testMeta.managedState.put(time, one, two);
+    Slice value = testMeta.managedState.getSync(time, one);
+    testMeta.managedState.endWindow();
+
+    Assert.assertEquals("value overwritten", two, value);
+    testMeta.managedState.teardown();
+  }
+
+  @Test
   public void testAsyncGet() throws ExecutionException, InterruptedException
   {
     Slice one = ManagedStateTestUtils.getSliceFor("1");

Reply via email to