Repository: apex-malhar Updated Branches: refs/heads/master 58201fbf9 -> baff632ae
APEXMALHAR-2276 over writing the value of a key in the same time bucket Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5a0d1ebc Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5a0d1ebc Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5a0d1ebc Branch: refs/heads/master Commit: 5a0d1ebc07900811cbe57aa7f2867d9e4574d4cf Parents: 139d89f Author: Chandni Singh <[email protected]> Authored: Fri Sep 30 13:01:09 2016 -0700 Committer: Chandni Singh <[email protected]> Committed: Fri Sep 30 14:24:15 2016 -0700 ---------------------------------------------------------------------- .../apex/malhar/lib/state/managed/Bucket.java | 5 ++++- .../managed/ManagedTimeUnifiedStateImplTest.java | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5a0d1ebc/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java index 0ed1865..74364a9 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java @@ -389,12 +389,15 @@ public interface Bucket extends ManagedStateComponent sizeInBytes.getAndAdd(key.length); sizeInBytes.getAndAdd(Long.SIZE); } - if (timeBucket > bucketedValue.getTimeBucket()) { + if (timeBucket >= bucketedValue.getTimeBucket()) { int inc = null == bucketedValue.getValue() ? value.length : value.length - bucketedValue.getValue().length; sizeInBytes.getAndAdd(inc); bucketedValue.setTimeBucket(timeBucket); bucketedValue.setValue(value); + } else { + LOG.warn("ignoring {} {} {}; existing {} {}", key, value, timeBucket, + bucketedValue.getValue(), bucketedValue.getTimeBucket()); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5a0d1ebc/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java index 6808b63..25596fa 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java @@ -88,6 +88,25 @@ public class ManagedTimeUnifiedStateImplTest } @Test + public void testPutWithMultipleValuesForAKey() + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + + testMeta.managedState.setup(testMeta.operatorContext); + long time = System.currentTimeMillis(); + testMeta.managedState.beginWindow(0); + testMeta.managedState.put(time, one, one); + + Slice two = ManagedStateTestUtils.getSliceFor("2"); + testMeta.managedState.put(time, one, two); + Slice value = testMeta.managedState.getSync(time, one); + testMeta.managedState.endWindow(); + + Assert.assertEquals("value overwritten", two, value); + testMeta.managedState.teardown(); + } + + @Test public void testAsyncGet() throws ExecutionException, InterruptedException { Slice one = ManagedStateTestUtils.getSliceFor("1");
