Repository: apex-malhar Updated Branches: refs/heads/master 6dcd82120 -> 570ecaeb7
APEXMALHAR-2350 #resolve #comment The key and value stream should match with the bucket Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/4edbec89 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/4edbec89 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/4edbec89 Branch: refs/heads/master Commit: 4edbec89e4b6855e2f4a7517ca5538df0ad2a35e Parents: a202cdc Author: brightchen <[email protected]> Authored: Mon Nov 28 16:17:15 2016 -0800 Committer: brightchen <[email protected]> Committed: Mon Mar 6 11:29:44 2017 -0800 ---------------------------------------------------------------------- .../managed/ManagedTimeUnifiedStateImpl.java | 2 +- .../lib/state/spillable/SpillableMapImpl.java | 30 +++++- .../lib/utils/serde/KeyValueSerdeManager.java | 24 ++++- .../state/spillable/SpillableMapImplTest.java | 103 +++++++++++++++++++ 4 files changed, 151 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java index 62ebbc5..82d381c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java @@ -216,7 +216,7 @@ public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implem @Override protected void addBucketName(long bucketId) { - long operatorId = (long)managedStateContext.getOperatorContext().getId(); + long operatorId = managedStateContext.getOperatorContext().getId(); if (!bucketNamesOnFS.contains(operatorId)) { bucketNamesOnFS.add(operatorId); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java index e7071a2..56a3b0e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java @@ -26,6 +26,7 @@ import java.util.Set; import javax.validation.constraints.NotNull; import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager; import org.apache.apex.malhar.lib.utils.serde.BufferSlice; @@ -150,7 +151,7 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi return val; } - Slice valSlice = store.getSync(getBucket(key), keyValueSerdeManager.serializeDataKey(key, false)); + Slice valSlice = store.getSync(getBucketTimeOrId(key), keyValueSerdeManager.serializeDataKey(key, false)); if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) { return null; @@ -236,13 +237,29 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi @Override public void endWindow() { + boolean isTimeUnifiedStore = (store instanceof ManagedTimeUnifiedStateImpl); for (K key: cache.getChangedKeys()) { - store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true), + //the getBucket() returned in fact is time, the bucket assign then assigned the bucketId + long timeOrBucketId = bucket; + long bucketId = timeOrBucketId; + if (isTimeUnifiedStore) { + timeOrBucketId = getBucketTimeOrId(key); + bucketId = ((ManagedTimeUnifiedStateImpl)store).getTimeBucketAssigner().getTimeBucket(timeOrBucketId); + } + keyValueSerdeManager.updateBuffersForBucketChange(bucketId); + store.put(timeOrBucketId, keyValueSerdeManager.serializeDataKey(key, true), keyValueSerdeManager.serializeValue(cache.get(key))); } for (K key: cache.getRemovedKeys()) { - store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE); + long timeOrBucketId = bucket; + long bucketId = timeOrBucketId; + if (isTimeUnifiedStore) { + timeOrBucketId = getBucketTimeOrId(key); + bucketId = ((ManagedTimeUnifiedStateImpl)store).getTimeBucketAssigner().getTimeBucket(timeOrBucketId); + } + keyValueSerdeManager.updateBuffersForBucketChange(bucketId); + store.put(timeOrBucketId, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE); } cache.endWindow(); keyValueSerdeManager.resetReadBuffer(); @@ -253,7 +270,12 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi { } - private long getBucket(K key) + /** + * + * @param key + * @return The bucket time for time unified store or bucket id for store with fixed bucket + */ + private long getBucketTimeOrId(K key) { return timeExtractor != null ? timeExtractor.getTime(key) : bucket; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java index e74c7a3..405683b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java @@ -28,6 +28,7 @@ import com.datatorrent.netlet.util.Slice; */ public class KeyValueSerdeManager<K, V> { + public static final long INVALID_BUCKET_ID = -1; protected Serde<K> keySerde; protected Serde<V> valueSerde; @@ -36,6 +37,8 @@ public class KeyValueSerdeManager<K, V> protected SerializationBuffer valueBuffer; + private long lastBucketId = INVALID_BUCKET_ID; + private transient BucketProvider bucketProvider; protected KeyValueSerdeManager() { @@ -50,13 +53,28 @@ public class KeyValueSerdeManager<K, V> public void setup(BucketProvider bp, long bucketId) { - //the bucket will not change for this class. so get streams from setup, else, need to set stream before serialize - Bucket bucketInst = bp.ensureBucket(bucketId); - this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream()); + bucketProvider = bp; + updateBuffersForBucketChange(bucketId); + } + /** + * The bucket can be changed. The write buffer should also be changed if bucket changed. + * @param bucketId + */ + public void updateBuffersForBucketChange(long bucketId) + { + if (lastBucketId == bucketId) { + return; + } + + Bucket bucketInst = bucketProvider.ensureBucket(bucketId); + this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream()); keyBufferForWrite = new SerializationBuffer(bucketInst.getKeyStream()); + + lastBucketId = bucketId; } + public Slice serializeKey(K key, boolean write) { SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/4edbec89/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java index 760bc5c..ce51a03 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java @@ -25,8 +25,13 @@ import org.junit.runner.RunWith; import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer; import org.apache.apex.malhar.lib.utils.serde.StringSerde; +import com.google.common.base.Preconditions; + import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; @@ -452,4 +457,102 @@ public class SpillableMapImplTest map1.teardown(); } + + + protected static class SerdeManagerForTest<K, V> extends AffixKeyValueSerdeManager<K, V> + { + public SerdeManagerForTest(byte[] metaKeySuffix, byte[] dataKeyIdentifier, Serde<K> keySerde, Serde<V> valueSerde) + { + super(metaKeySuffix, dataKeyIdentifier, keySerde, valueSerde); + } + + public SerializationBuffer getValueBuffer() + { + return valueBuffer; + } + + public SerializationBuffer getKeyBufferForWrite() + { + return keyBufferForWrite; + } + } + + protected static class SpillableMapImplForTest<K, V> extends SpillableMapImpl<K, V> + { + protected SerdeManagerForTest<K, V> serdeManager; + + public SpillableMapImplForTest(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> serdeKey, + Serde<V> serdeValue) + { + super(store, identifier, bucket, serdeKey, serdeValue); + serdeManager = new SerdeManagerForTest<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue)); + keyValueSerdeManager = serdeManager; + } + + public SpillableMapImplForTest(SpillableStateStore store, byte[] identifier, Serde<K> serdeKey, + Serde<V> serdeValue, TimeExtractor<K> timeExtractor) + { + super(store, identifier, serdeKey, serdeValue, timeExtractor); + serdeManager = new SerdeManagerForTest<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue)); + keyValueSerdeManager = serdeManager; + } + } + + @Test + @Parameters({"TimeUnifiedManagedState"}) + public void serializationBufferTest(String opt) + { + SerializationBuffer keyBuffer = null; + SerializationBuffer valueBuffer = null; + SerializationBuffer currentBuffer; + + setup(opt); + SpillableMapImplForTest<String, String> map; + if (te == null) { + map = new SpillableMapImplForTest<>(store,ID1,0L,new StringSerde(), new StringSerde()); + } else { + map = new SpillableMapImplForTest<>(store,ID1,new StringSerde(), new StringSerde(), te); + } + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long windowId = 0L; + store.beginWindow(windowId); + map.beginWindow(windowId); + + map.put("a", "1"); + + map.endWindow(); + store.endWindow(); + + currentBuffer = map.serdeManager.getKeyBufferForWrite(); + Assert.assertTrue(currentBuffer != keyBuffer); + keyBuffer = currentBuffer; + + currentBuffer = map.serdeManager.getValueBuffer(); + Assert.assertTrue(currentBuffer != valueBuffer); + valueBuffer = currentBuffer; + + ++windowId; + store.beginWindow(windowId); + map.beginWindow(windowId); + + //each put use different key to make sure use the different bucket + map.put("b", "2"); + + map.endWindow(); + store.endWindow(); + + currentBuffer = map.serdeManager.getKeyBufferForWrite(); + Assert.assertTrue(currentBuffer != keyBuffer); + keyBuffer = currentBuffer; + + currentBuffer = map.serdeManager.getValueBuffer(); + Assert.assertTrue(currentBuffer != valueBuffer); + valueBuffer = currentBuffer; + + map.teardown(); + store.teardown(); + } }
