Repository: apex-malhar
Updated Branches:
  refs/heads/master 6dcd82120 -> 570ecaeb7


APEXMALHAR-2350 #resolve #comment The key and value stream should match with 
the bucket


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4edbec89
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4edbec89
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4edbec89

Branch: refs/heads/master
Commit: 4edbec89e4b6855e2f4a7517ca5538df0ad2a35e
Parents: a202cdc
Author: brightchen <[email protected]>
Authored: Mon Nov 28 16:17:15 2016 -0800
Committer: brightchen <[email protected]>
Committed: Mon Mar 6 11:29:44 2017 -0800

----------------------------------------------------------------------
 .../managed/ManagedTimeUnifiedStateImpl.java    |   2 +-
 .../lib/state/spillable/SpillableMapImpl.java   |  30 +++++-
 .../lib/utils/serde/KeyValueSerdeManager.java   |  24 ++++-
 .../state/spillable/SpillableMapImplTest.java   | 103 +++++++++++++++++++
 4 files changed, 151 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
index 62ebbc5..82d381c 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -216,7 +216,7 @@ public class ManagedTimeUnifiedStateImpl extends 
AbstractManagedStateImpl implem
     @Override
     protected void addBucketName(long bucketId)
     {
-      long operatorId = (long)managedStateContext.getOperatorContext().getId();
+      long operatorId = managedStateContext.getOperatorContext().getId();
       if (!bucketNamesOnFS.contains(operatorId)) {
         bucketNamesOnFS.add(operatorId);
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
index e7071a2..56a3b0e 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
 import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
 import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
@@ -150,7 +151,7 @@ public class SpillableMapImpl<K, V> implements 
Spillable.SpillableMap<K, V>, Spi
       return val;
     }
 
-    Slice valSlice = store.getSync(getBucket(key), 
keyValueSerdeManager.serializeDataKey(key, false));
+    Slice valSlice = store.getSync(getBucketTimeOrId(key), 
keyValueSerdeManager.serializeDataKey(key, false));
 
     if (valSlice == null || valSlice == BucketedState.EXPIRED || 
valSlice.length == 0) {
       return null;
@@ -236,13 +237,29 @@ public class SpillableMapImpl<K, V> implements 
Spillable.SpillableMap<K, V>, Spi
   @Override
   public void endWindow()
   {
+    boolean isTimeUnifiedStore = (store instanceof 
ManagedTimeUnifiedStateImpl);
     for (K key: cache.getChangedKeys()) {
-      store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, 
true),
+      //the getBucket() returned in fact is time, the bucket assign then 
assigned the bucketId
+      long timeOrBucketId = bucket;
+      long bucketId = timeOrBucketId;
+      if (isTimeUnifiedStore) {
+        timeOrBucketId = getBucketTimeOrId(key);
+        bucketId = 
((ManagedTimeUnifiedStateImpl)store).getTimeBucketAssigner().getTimeBucket(timeOrBucketId);
+      }
+      keyValueSerdeManager.updateBuffersForBucketChange(bucketId);
+      store.put(timeOrBucketId, keyValueSerdeManager.serializeDataKey(key, 
true),
           keyValueSerdeManager.serializeValue(cache.get(key)));
     }
 
     for (K key: cache.getRemovedKeys()) {
-      store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, 
true), BufferSlice.EMPTY_SLICE);
+      long timeOrBucketId = bucket;
+      long bucketId = timeOrBucketId;
+      if (isTimeUnifiedStore) {
+        timeOrBucketId = getBucketTimeOrId(key);
+        bucketId = 
((ManagedTimeUnifiedStateImpl)store).getTimeBucketAssigner().getTimeBucket(timeOrBucketId);
+      }
+      keyValueSerdeManager.updateBuffersForBucketChange(bucketId);
+      store.put(timeOrBucketId, keyValueSerdeManager.serializeDataKey(key, 
true), BufferSlice.EMPTY_SLICE);
     }
     cache.endWindow();
     keyValueSerdeManager.resetReadBuffer();
@@ -253,7 +270,12 @@ public class SpillableMapImpl<K, V> implements 
Spillable.SpillableMap<K, V>, Spi
   {
   }
 
-  private long getBucket(K key)
+  /**
+   *
+   * @param key
+   * @return The bucket time for time unified store or bucket id for store 
with fixed bucket
+   */
+  private long getBucketTimeOrId(K key)
   {
     return timeExtractor != null ? timeExtractor.getTime(key) : bucket;
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
index e74c7a3..405683b 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
@@ -28,6 +28,7 @@ import com.datatorrent.netlet.util.Slice;
  */
 public class KeyValueSerdeManager<K, V>
 {
+  public static final long INVALID_BUCKET_ID = -1;
   protected Serde<K> keySerde;
   protected Serde<V> valueSerde;
 
@@ -36,6 +37,8 @@ public class KeyValueSerdeManager<K, V>
 
   protected SerializationBuffer valueBuffer;
 
+  private long lastBucketId = INVALID_BUCKET_ID;
+  private transient BucketProvider bucketProvider;
 
   protected KeyValueSerdeManager()
   {
@@ -50,13 +53,28 @@ public class KeyValueSerdeManager<K, V>
 
   public void setup(BucketProvider bp, long bucketId)
   {
-    //the bucket will not change for this class. so get streams from setup, 
else, need to set stream before serialize
-    Bucket bucketInst = bp.ensureBucket(bucketId);
-    this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream());
+    bucketProvider = bp;
+    updateBuffersForBucketChange(bucketId);
+  }
 
+  /**
+   * The bucket can be changed. The write buffer should also be changed if 
bucket changed.
+   * @param bucketId
+   */
+  public void updateBuffersForBucketChange(long bucketId)
+  {
+    if (lastBucketId == bucketId) {
+      return;
+    }
+
+    Bucket bucketInst = bucketProvider.ensureBucket(bucketId);
+    this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream());
     keyBufferForWrite = new SerializationBuffer(bucketInst.getKeyStream());
+
+    lastBucketId = bucketId;
   }
 
+
   public Slice serializeKey(K key, boolean write)
   {
     SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
index 760bc5c..ce51a03 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
@@ -25,8 +25,13 @@ import org.junit.runner.RunWith;
 
 import org.apache.apex.malhar.lib.state.managed.TimeExtractor;
 import 
org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
 import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
+import com.google.common.base.Preconditions;
+
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
@@ -452,4 +457,102 @@ public class SpillableMapImplTest
 
     map1.teardown();
   }
+
+
+  protected static class SerdeManagerForTest<K, V> extends 
AffixKeyValueSerdeManager<K, V>
+  {
+    public SerdeManagerForTest(byte[] metaKeySuffix, byte[] dataKeyIdentifier, 
Serde<K> keySerde, Serde<V> valueSerde)
+    {
+      super(metaKeySuffix, dataKeyIdentifier, keySerde, valueSerde);
+    }
+
+    public SerializationBuffer getValueBuffer()
+    {
+      return valueBuffer;
+    }
+
+    public SerializationBuffer getKeyBufferForWrite()
+    {
+      return keyBufferForWrite;
+    }
+  }
+
+  protected static class SpillableMapImplForTest<K, V> extends 
SpillableMapImpl<K, V>
+  {
+    protected SerdeManagerForTest<K, V> serdeManager;
+
+    public SpillableMapImplForTest(SpillableStateStore store, byte[] 
identifier, long bucket, Serde<K> serdeKey,
+        Serde<V> serdeValue)
+    {
+      super(store, identifier, bucket, serdeKey, serdeValue);
+      serdeManager = new SerdeManagerForTest<>(null, identifier, 
Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue));
+      keyValueSerdeManager = serdeManager;
+    }
+
+    public SpillableMapImplForTest(SpillableStateStore store, byte[] 
identifier, Serde<K> serdeKey,
+        Serde<V> serdeValue, TimeExtractor<K> timeExtractor)
+    {
+      super(store, identifier, serdeKey, serdeValue, timeExtractor);
+      serdeManager = new SerdeManagerForTest<>(null, identifier, 
Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue));
+      keyValueSerdeManager = serdeManager;
+    }
+  }
+
+  @Test
+  @Parameters({"TimeUnifiedManagedState"})
+  public void serializationBufferTest(String opt)
+  {
+    SerializationBuffer keyBuffer = null;
+    SerializationBuffer valueBuffer = null;
+    SerializationBuffer currentBuffer;
+
+    setup(opt);
+    SpillableMapImplForTest<String, String> map;
+    if (te == null) {
+      map = new SpillableMapImplForTest<>(store,ID1,0L,new StringSerde(), new 
StringSerde());
+    } else {
+      map = new SpillableMapImplForTest<>(store,ID1,new StringSerde(), new 
StringSerde(), te);
+    }
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    map.put("a", "1");
+
+    map.endWindow();
+    store.endWindow();
+
+    currentBuffer = map.serdeManager.getKeyBufferForWrite();
+    Assert.assertTrue(currentBuffer != keyBuffer);
+    keyBuffer = currentBuffer;
+
+    currentBuffer = map.serdeManager.getValueBuffer();
+    Assert.assertTrue(currentBuffer != valueBuffer);
+    valueBuffer = currentBuffer;
+
+    ++windowId;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    //each put use different key to make sure use the different bucket
+    map.put("b", "2");
+
+    map.endWindow();
+    store.endWindow();
+
+    currentBuffer = map.serdeManager.getKeyBufferForWrite();
+    Assert.assertTrue(currentBuffer != keyBuffer);
+    keyBuffer = currentBuffer;
+
+    currentBuffer = map.serdeManager.getValueBuffer();
+    Assert.assertTrue(currentBuffer != valueBuffer);
+    valueBuffer = currentBuffer;
+
+    map.teardown();
+    store.teardown();
+  }
 }

Reply via email to