Repository: apex-malhar Updated Branches: refs/heads/master 27272a588 -> 16edf3067
APEXMALHAR-2244 #comment Use TimeUnifiedManageStateStore for Spillable Data Structure Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/16edf306 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/16edf306 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/16edf306 Branch: refs/heads/master Commit: 16edf3067226735682ecce63eedd0b120a708427 Parents: 27272a5 Author: Siyuan Hua <[email protected]> Authored: Thu Oct 27 14:06:51 2016 -0700 Committer: Siyuan Hua <[email protected]> Committed: Mon Oct 31 11:14:04 2016 -0700 ---------------------------------------------------------------------- .../state/managed/AbstractManagedStateImpl.java | 2 +- .../malhar/lib/state/managed/TimeExtractor.java | 27 +++ .../spillable/SpillableComplexComponent.java | 40 ++++ .../SpillableComplexComponentImpl.java | 44 +++- .../lib/state/spillable/SpillableMapImpl.java | 45 +++- .../lib/state/spillable/SpillableSetImpl.java | 23 +- .../spillable/SpillableSetMultimapImpl.java | 95 ++++++-- ...agedTimeUnifiedStateSpillableStateStore.java | 29 +++ .../impl/SpillableWindowedKeyedStorage.java | 4 +- .../impl/SpillableWindowedPlainStorage.java | 2 +- .../window/impl/WindowKeyPairTimeExtractor.java | 40 ++++ .../lib/window/impl/WindowTimeExtractor.java | 35 +++ .../state/spillable/SpillableMapImplTest.java | 234 ++++++++----------- .../state/spillable/SpillableSetImplTest.java | 27 ++- .../spillable/SpillableSetMultimapImplTest.java | 21 +- .../lib/state/spillable/SpillableTestUtils.java | 4 + .../spillable/TestStringTimeExtractor.java | 36 +++ .../window/SpillableWindowedStorageTest.java | 19 +- .../malhar/lib/window/WindowedOperatorTest.java | 90 +++---- 19 files changed, 580 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java index 20271b0..1c52c31 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java @@ -316,7 +316,7 @@ public abstract class AbstractManagedStateImpl protected int getBucketIdx(long bucketId) { - return (int)(bucketId % numBuckets); + return (int)Math.abs(bucketId % numBuckets); } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java new file mode 100644 index 0000000..e70e80f --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeExtractor.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +/** + * A way to extract time from data + */ +public interface TimeExtractor<T> +{ + long getTime(T t); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java index 542a914..b6ec6a2 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java @@ -18,6 +18,7 @@ */ package org.apache.apex.malhar.lib.state.spillable; +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.state.spillable.Spillable.SpillableComponent; import org.apache.apex.malhar.lib.utils.serde.Serde; @@ -68,6 +69,19 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S Serde<V> serdeValue); /** + * This is a method for creating a {@link SpillableMap}. This method + * auto-generates an identifier for the data structure. + * @param <K> The type of the keys. + * @param <V> The type of the values. + * @param serdeKey The Serializer/Deserializer to use for the map's keys. + * @param serdeValue The Serializer/Deserializer to use for the map's values. + * @param timeExtractor a util object to extract time from key. + * @return A {@link SpillableMap}. + */ + <K, V> SpillableMap<K, V> newSpillableMap(Serde<K> serdeKey, + Serde<V> serdeValue, TimeExtractor<K> timeExtractor); + + /** * This is a method for creating a {@link SpillableMap}. * @param <K> The type of the keys. * @param <V> The type of the values. @@ -81,6 +95,19 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S Serde<K> serdeKey, Serde<V> serdeValue); /** + * This is a method for creating a {@link SpillableMap}. + * @param <K> The type of the keys. + * @param <V> The type of the values. + * @param identifier The identifier for this {@link SpillableMap}. + * @param serdeKey The Serializer/Deserializer to use for the map's keys. + * @param serdeValue The Serializer/Deserializer to use for the map's values. + * @param timeExtractor a util object to extract time from key. + * @return A {@link SpillableMap}. + */ + <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, + Serde<K> serdeKey, Serde<V> serdeValue, TimeExtractor<K> timeExtractor); + + /** * This is a method for creating a {@link SpillableListMultimap}. This method * auto-generates an identifier for the data structure. * @param <K> The type of the keys. @@ -118,6 +145,19 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue); /** + * This is a method for creating a {@link SpillableSetMultimap}. + * @param <K> The type of the keys. + * @param <V> The type of the values in the map's lists. + * @param bucket The bucket that this {@link SpillableSetMultimap} will be spilled to. + * @param serdeKey The Serializer/Deserializer to use for the map's keys. + * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists. + * @param timeExtractor a util object to extract time from key. + * @return A {@link SpillableSetMultimap}. + */ + <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, + Serde<V> serdeValue, TimeExtractor<K> timeExtractor); + + /** * This is a method for creating a {@link SpillableMultiset}. This method * auto-generates an identifier for the data structure. * @param <T> The type of the elements. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java index 1a3f550..1d9fbc6 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java @@ -23,6 +23,7 @@ import java.util.Set; import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.hadoop.classification.InterfaceStability; @@ -75,7 +76,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent @Override public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde) { - SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde); + SpillableArrayListImpl<T> list = new SpillableArrayListImpl<>(bucket, identifierGenerator.next(), store, serde); componentList.add(list); return list; } @@ -84,7 +85,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde) { identifierGenerator.register(identifier); - SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde); + SpillableArrayListImpl<T> list = new SpillableArrayListImpl<>(bucket, identifier, store, serde); bucketIds.add(bucket); componentList.add(list); return list; @@ -94,7 +95,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue) { - SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifierGenerator.next(), + SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifierGenerator.next(), bucket, serdeKey, serdeValue); bucketIds.add(bucket); componentList.add(map); @@ -106,16 +107,35 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent Serde<V> serdeValue) { identifierGenerator.register(identifier); - SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue); + SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifier, bucket, serdeKey, serdeValue); bucketIds.add(bucket); componentList.add(map); return map; } @Override + public <K, V> SpillableMap<K, V> newSpillableMap(Serde<K> serdeKey, + Serde<V> serdeValue, TimeExtractor<K> timeExtractor) + { + SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifierGenerator.next(), serdeKey, serdeValue, timeExtractor); + componentList.add(map); + return map; + } + + @Override + public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, Serde<K> serdeKey, + Serde<V> serdeValue, TimeExtractor<K> timeExtractor) + { + identifierGenerator.register(identifier); + SpillableMapImpl<K, V> map = new SpillableMapImpl<>(store, identifier, serdeKey, serdeValue, timeExtractor); + componentList.add(map); + return map; + } + + @Override public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue) { - SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store, + SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<>(store, identifierGenerator.next(), bucket, serdeKey, serdeValue); bucketIds.add(bucket); componentList.add(map); @@ -128,7 +148,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent Serde<V> serdeValue) { identifierGenerator.register(identifier); - SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store, + SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<>(store, identifier, bucket, serdeKey, serdeValue); bucketIds.add(bucket); componentList.add(map); @@ -138,7 +158,7 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent @Override public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue) { - SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<K, V>(store, + SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<>(store, identifierGenerator.next(), bucket, serdeKey, serdeValue); bucketIds.add(bucket); componentList.add(map); @@ -146,6 +166,16 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent } @Override + public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, + Serde<V> serdeValue, TimeExtractor<K> timeExtractor) + { + SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<>(store, + identifierGenerator.next(), bucket, serdeKey, serdeValue, timeExtractor); + componentList.add(map); + return map; + } + + @Override public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde) { throw new UnsupportedOperationException("Unsupported Operation"); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java index 5fa39d7..e7071a2 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java @@ -26,6 +26,7 @@ import java.util.Set; import javax.validation.constraints.NotNull; import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager; import org.apache.apex.malhar.lib.utils.serde.BufferSlice; import org.apache.apex.malhar.lib.utils.serde.Serde; @@ -55,10 +56,11 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>(); private transient Input tmpInput = new Input(); + private TimeExtractor<K> timeExtractor; + @NotNull private SpillableStateStore store; - @NotNull - private byte[] identifier; + private long bucket; private int size = 0; @@ -76,16 +78,32 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi * @param identifier The Id of this {@link SpillableMapImpl}. * @param bucket The Id of the bucket used to store this * {@link SpillableMapImpl} in the provided {@link SpillableStateStore}. - * @param keySerde The {@link Serde} to use when serializing and deserializing keys. - * @param keySerde The {@link Serde} to use when serializing and deserializing values. + * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. + * @param serdeValue The {@link Serde} to use when serializing and deserializing values. */ - public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> keySerde, - Serde<V> valueSerde) + public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> serdeKey, + Serde<V> serdeValue) { this.store = Preconditions.checkNotNull(store); - this.identifier = Preconditions.checkNotNull(identifier); this.bucket = bucket; - keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(null, identifier, Preconditions.checkNotNull(keySerde), Preconditions.checkNotNull(valueSerde)); + keyValueSerdeManager = new AffixKeyValueSerdeManager<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue)); + } + + /** + * Creats a {@link SpillableMapImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param identifier The Id of this {@link SpillableMapImpl}. + * {@link SpillableMapImpl} in the provided {@link SpillableStateStore}. + * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. + * @param serdeValue The {@link Serde} to use when serializing and deserializing values. + * @param timeExtractor Extract time from the each element and use it to decide where the data goes + */ + public SpillableMapImpl(SpillableStateStore store, byte[] identifier, Serde<K> serdeKey, + Serde<V> serdeValue, TimeExtractor<K> timeExtractor) + { + this.store = Preconditions.checkNotNull(store); + keyValueSerdeManager = new AffixKeyValueSerdeManager<>(null, identifier, Preconditions.checkNotNull(serdeKey), Preconditions.checkNotNull(serdeValue)); + this.timeExtractor = timeExtractor; } public SpillableStateStore getStore() @@ -132,7 +150,7 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi return val; } - Slice valSlice = store.getSync(bucket, keyValueSerdeManager.serializeDataKey(key, false)); + Slice valSlice = store.getSync(getBucket(key), keyValueSerdeManager.serializeDataKey(key, false)); if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) { return null; @@ -219,12 +237,12 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi public void endWindow() { for (K key: cache.getChangedKeys()) { - store.put(bucket, keyValueSerdeManager.serializeDataKey(key, true), + store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true), keyValueSerdeManager.serializeValue(cache.get(key))); } for (K key: cache.getRemovedKeys()) { - store.put(this.bucket, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE); + store.put(getBucket(key), keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE); } cache.endWindow(); keyValueSerdeManager.resetReadBuffer(); @@ -234,4 +252,9 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi public void teardown() { } + + private long getBucket(K key) + { + return timeExtractor != null ? timeExtractor.getTime(key) : bucket; + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java index 0dfc411..221cd38 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java @@ -25,6 +25,7 @@ import java.util.NoSuchElementException; import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.hadoop.classification.InterfaceStability; @@ -89,8 +90,6 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable } @NotNull - private SpillableStateStore store; - @NotNull private SpillableMapImpl<T, ListNode<T>> map; private T head; @@ -103,7 +102,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable public SpillableStateStore getStore() { - return store; + return map.getStore(); } /** @@ -118,9 +117,23 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable @NotNull SpillableStateStore store, @NotNull Serde<T> serde) { - this.store = Preconditions.checkNotNull(store); + map = new SpillableMapImpl<>(Preconditions.checkNotNull(store), prefix, bucketId, serde, new ListNodeSerde<>(serde)); + } - map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new ListNodeSerde(serde)); + /** + * Creates a {@link SpillableSetImpl}. + * {@link SpillableSetImpl} in the provided {@link SpillableStateStore}. + * @param prefix The Id of this {@link SpillableSetImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param serde The {@link Serde} to use when serializing and deserializing data. + * @param timeExtractor Extract time from the each element and use it to decide where the data goes. + */ + public SpillableSetImpl(@NotNull byte[] prefix, + @NotNull SpillableStateStore store, + @NotNull Serde<T> serde, + @NotNull TimeExtractor timeExtractor) + { + map = new SpillableMapImpl<>(Preconditions.checkNotNull(store), prefix, serde, new ListNodeSerde<>(serde), timeExtractor); } public void setSize(int size) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java index 76e47f2..fb88d9c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java @@ -27,10 +27,11 @@ import java.util.Set; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager; +import org.apache.apex.malhar.lib.utils.serde.AffixSerde; import org.apache.apex.malhar.lib.utils.serde.IntSerde; import org.apache.apex.malhar.lib.utils.serde.PairSerde; -import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -55,21 +56,46 @@ import com.datatorrent.netlet.util.Slice; public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMultimap<K, V>, Spillable.SpillableComponent { + + private static class FixedTimeExtractor<V> implements TimeExtractor<V> + { + + private long fixedTime; + + private FixedTimeExtractor(long fixedTime) + { + this.fixedTime = fixedTime; + } + + private FixedTimeExtractor() + { + // For kryo + } + + @Override + public long getTime(V v) + { + return fixedTime; + } + + } + public static final int DEFAULT_BATCH_SIZE = 1000; public static final byte[] META_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; private transient WindowBoundedMapCache<K, SpillableSetImpl<V>> cache = new WindowBoundedMapCache<>(); @NotNull - private SpillableMapImpl<Slice, Pair<Integer, V>> map; + private SpillableMapImpl<K, Pair<Integer, V>> map; private SpillableStateStore store; - private byte[] identifier; private long bucket; private Serde<V> valueSerde; private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>(); - protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager; - protected transient Context.OperatorContext context; + private TimeExtractor<K> timeExtractor = null; + private AffixKeyValueSerdeManager<K, V> keyValueSerdeManager; + private transient Context.OperatorContext context; + private SpillableSetMultimapImpl() { // for kryo @@ -81,8 +107,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul * @param identifier The Id of this {@link SpillableSetMultimapImpl}. * @param bucket The Id of the bucket used to store this * {@link SpillableSetMultimapImpl} in the provided {@link SpillableStateStore}. - * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. - * @param serdeKey The {@link Serde} to use when serializing and deserializing values. + * @param keySerde The {@link Serde} to use when serializing and deserializing keys. + * @param valueSerde The {@link Serde} to use when serializing and deserializing values. */ public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> keySerde, @@ -93,7 +119,32 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul this.valueSerde = Preconditions.checkNotNull(valueSerde); keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(META_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde); - map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new PairSerde<>(new IntSerde(), valueSerde)); + map = new SpillableMapImpl<>(store, identifier, bucket, new AffixSerde<>(null, keySerde, META_KEY_SUFFIX), new PairSerde<>(new IntSerde(), valueSerde)); + } + + + /** + * Creates a {@link SpillableSetMultimapImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param identifier The Id of this {@link SpillableSetMultimapImpl}. + * @param bucket The Id of the bucket used to store this + * {@link SpillableSetMultimapImpl} in the provided {@link SpillableStateStore}. + * @param keySerde The {@link Serde} to use when serializing and deserializing keys. + * @param valueSerde The {@link Serde} to use when serializing and deserializing values. + * @param timeExtractor The {@link TimeExtractor} to be used to retrieve time from key + */ + public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde<K> keySerde, + Serde<V> valueSerde, + TimeExtractor<K> timeExtractor) + { + this.store = Preconditions.checkNotNull(store); + this.bucket = bucket; + this.valueSerde = Preconditions.checkNotNull(valueSerde); + keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(META_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde); + this.timeExtractor = timeExtractor; + + map = new SpillableMapImpl<>(store, identifier, new AffixSerde<>(null, keySerde, META_KEY_SUFFIX), new PairSerde<>(new IntSerde(), valueSerde), timeExtractor); } public SpillableStateStore getStore() @@ -112,14 +163,23 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul SpillableSetImpl<V> spillableSet = cache.get(key); if (spillableSet == null) { - Pair<Integer, V> meta = map.get(keyValueSerdeManager.serializeMetaKey(key, false)); + long keyTime = -1; + Pair<Integer, V> meta; + if (timeExtractor != null) { + keyTime = timeExtractor.getTime(key); + } + meta = map.get(key); if (meta == null) { return null; } Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, false); - spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde); + if (timeExtractor != null) { + spillableSet = new SpillableSetImpl<>(keyPrefix.toByteArray(), store, valueSerde, new FixedTimeExtractor(keyTime)); + } else { + spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde); + } spillableSet.setSize(meta.getLeft()); spillableSet.setHead(meta.getRight()); spillableSet.setup(context); @@ -166,8 +226,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul SpillableSetImpl<V> spillableSet = getHelper((K)key); if (spillableSet != null) { cache.remove((K)key); - Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false); - map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead())); + map.put((K)key, new ImmutablePair<>(0, spillableSet.getHead())); spillableSet.clear(); removedSets.add(spillableSet); } @@ -199,8 +258,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul if (cache.contains((K)key)) { return true; } - Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false); - Pair<Integer, V> meta = map.get(keySlice); + Pair<Integer, V> meta = map.get((K)key); return meta != null && meta.getLeft() > 0; } @@ -227,7 +285,11 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul SpillableSetImpl<V> spillableSet = getHelper(key); if (spillableSet == null) { - spillableSet = new SpillableSetImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde); + if (timeExtractor == null) { + spillableSet = new SpillableSetImpl<>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde); + } else { + spillableSet = new SpillableSetImpl<>(keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde, new FixedTimeExtractor(timeExtractor.getTime(key))); + } spillableSet.setup(context); cache.put(key, spillableSet); } @@ -304,8 +366,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul SpillableSetImpl<V> spillableSet = cache.get(key); spillableSet.endWindow(); - map.put(keyValueSerdeManager.serializeMetaKey(key, true), - new ImmutablePair<>(spillableSet.size(), spillableSet.getHead())); + map.put(key, new ImmutablePair<>(spillableSet.size(), spillableSet.getHead())); } for (SpillableSetImpl removedSet : removedSets) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java new file mode 100644 index 0000000..207cb31 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedTimeUnifiedStateSpillableStateStore.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable.managed; + +import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; + +/** + * + */ +public class ManagedTimeUnifiedStateSpillableStateStore extends ManagedTimeUnifiedStateImpl implements SpillableStateStore +{ +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java index ef111b3..d41c494 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java @@ -180,10 +180,10 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind } if (windowKeyToValueMap == null) { - windowKeyToValueMap = scc.newSpillableMap(bucket, windowKeyPairSerde, valueSerde); + windowKeyToValueMap = scc.newSpillableMap(windowKeyPairSerde, valueSerde, new WindowKeyPairTimeExtractor()); } if (windowToKeysMap == null) { - windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde); + windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde, new WindowTimeExtractor()); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java index 9a8a291..f9bbc17 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java @@ -133,7 +133,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe valueSerde = new GenericSerde<>(); } if (windowToDataMap == null) { - windowToDataMap = scc.newSpillableMap(bucket, windowSerde, valueSerde); + windowToDataMap = scc.newSpillableMap(windowSerde, valueSerde, new WindowTimeExtractor()); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java new file mode 100644 index 0000000..ecf63a5 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowKeyPairTimeExtractor.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.commons.lang3.tuple.Pair; + +/** + * A {@link TimeExtractor} to extract time from Pair of {@link Window} and key + * The type of key doesn't matter in this case, so it assumes object as the key type + */ +public class WindowKeyPairTimeExtractor<K> implements TimeExtractor<Pair<Window, K>> +{ + + private final WindowTimeExtractor windowTimeExtractor = new WindowTimeExtractor(); + + @Override + public long getTime(Pair<Window, K> windowKeyPair) + { + return windowTimeExtractor.getTime(windowKeyPair.getKey()); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java new file mode 100644 index 0000000..aee389a --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowTimeExtractor.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; +import org.apache.apex.malhar.lib.window.Window; + +/** + *ãA {@link TimeExtractor} to extract time from {@link Window} + */ +public class WindowTimeExtractor implements TimeExtractor<Window> +{ + @Override + public long getTime(Window window) + { + return window.getBeginTimestamp() + window.getDurationMillis(); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java index a96a8fd..760bc5c 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java @@ -21,7 +21,9 @@ package org.apache.apex.malhar.lib.state.spillable; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; import org.apache.apex.malhar.lib.utils.serde.StringSerde; @@ -31,31 +33,46 @@ import com.datatorrent.api.DAG; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.util.KryoCloneUtils; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + +@RunWith(JUnitParamsRunner.class) public class SpillableMapImplTest { public static final byte[] ID1 = new byte[]{(byte)0}; public static final byte[] ID2 = new byte[]{(byte)1}; + public static final TestStringTimeExtractor TE = new TestStringTimeExtractor(); + + private SpillableStateStore store; + + private TimeExtractor<String> te = null; + + @Rule public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); - @Test - public void simpleGetAndPutTest() - { - InMemSpillableStateStore store = new InMemSpillableStateStore(); - simpleGetAndPutTestHelper(store); + private void setup(String opt) + { + if (opt.equals("InMem")) { + store = new InMemSpillableStateStore(); + te = null; + } else if (opt.equals("ManagedState")) { + store = testMeta.store; + te = null; + } else { + store = testMeta.timeStore; + te = TE; + } } @Test - public void simpleGetAndPutManagedStateTest() - { - simpleGetAndPutTestHelper(testMeta.store); - } - - private void simpleGetAndPutTestHelper(SpillableStateStore store) + @Parameters({"InMem","ManagedState","TimeUnifiedManagedState"}) + public void simpleGetAndPutTest(String opt) { + setup(opt); SpillableMapImpl<String, String> map = createSpillableMap(store); store.setup(testMeta.operatorContext); @@ -73,15 +90,9 @@ public class SpillableMapImplTest Assert.assertEquals(3, map.size()); - Assert.assertEquals("1", map.get("a")); - Assert.assertEquals("2", map.get("b")); - Assert.assertEquals("3", map.get("c")); - Assert.assertEquals(null, map.get("d")); + assertMultiEqualsFromMap(map, new String[]{"1", "2", "3", null}, new String[]{"a", "b", "c", "d"}); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{null, null, null, null}); map.endWindow(); store.endWindow(); @@ -93,17 +104,11 @@ public class SpillableMapImplTest store.beginWindow(windowId); map.beginWindow(windowId); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{"1", "2", "3", null}); Assert.assertEquals(3, map.size()); - Assert.assertEquals("1", map.get("a")); - Assert.assertEquals("2", map.get("b")); - Assert.assertEquals("3", map.get("c")); - Assert.assertEquals(null, map.get("d")); + assertMultiEqualsFromMap(map, new String[]{"1", "2", "3", null}, new String[]{"a", "b", "c", "d"}); map.put("d", "4"); map.put("e", "5"); @@ -111,16 +116,9 @@ public class SpillableMapImplTest Assert.assertEquals(6, map.size()); - Assert.assertEquals("4", map.get("d")); - Assert.assertEquals("5", map.get("e")); - Assert.assertEquals("6", map.get("f")); + assertMultiEqualsFromMap(map, new String[]{"4", "5", "6"}, new String[]{"d", "e", "f"}); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, null); + multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f"}, ID1, new String[]{"1", "2", "3", null, null, null}); map.endWindow(); store.endWindow(); @@ -132,13 +130,8 @@ public class SpillableMapImplTest store.beginWindow(windowId); map.beginWindow(windowId); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3"); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); - SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + + multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{"1", "2", "3", "4", "5", "6", null}); map.endWindow(); store.endWindow(); @@ -150,28 +143,43 @@ public class SpillableMapImplTest store.teardown(); } - @Test - public void simpleRemoveTest() + private void multiValueCheck(String[] keys, byte[] samePrefix, String[] expectedVal) { - InMemSpillableStateStore store = new InMemSpillableStateStore(); - simpleRemoveTestHelper(store); + for (int i = 0; i < keys.length; i++) { + SpillableTestUtils.checkValue(store, _bid(keys[i], te), keys[i], samePrefix, expectedVal[i]); + } } + private void assertMultiEqualsFromMap(SpillableMapImpl<String, String> map, String[] expectedV, String[] keys) + { + for (int i = 0; i < expectedV.length; i++) { + Assert.assertEquals(expectedV[i], map.get(keys[i])); + } + } - @Test - public void simpleRemoveManagedStateTest() + private long _bid(String key, TimeExtractor<String> te) { - simpleRemoveTestHelper(testMeta.store); + if (te != null) { + return te.getTime(key); + } else { + return 0L; + } } - protected SpillableMapImpl<String, String> createSpillableMap(SpillableStateStore store) + private SpillableMapImpl<String, String> createSpillableMap(SpillableStateStore store) { - return new SpillableMapImpl<String, String>(store, ID1, 0L, new StringSerde(), - new StringSerde()); + if (te == null) { + return new SpillableMapImpl<>(store,ID1,0L,new StringSerde(), new StringSerde()); + } else { + return new SpillableMapImpl<>(store,ID1,new StringSerde(), new StringSerde(), te); + } } - private void simpleRemoveTestHelper(SpillableStateStore store) + @Test + @Parameters({"InMem","ManagedState","TimeUnifiedManagedState"}) + public void simpleRemoveTest(String opt) { + setup(opt); SpillableMapImpl<String, String> map = createSpillableMap(store); store.setup(testMeta.operatorContext); @@ -199,10 +207,7 @@ public class SpillableMapImplTest Assert.assertEquals(1, map.size()); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{null, null, null, null}); map.endWindow(); store.endWindow(); @@ -210,10 +215,7 @@ public class SpillableMapImplTest store.checkpointed(windowId); store.committed(windowId); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); + multiValueCheck(new String[]{"a", "b", "c", "d"}, ID1, new String[]{"1", null, null, null}); windowId++; store.beginWindow(windowId); @@ -236,12 +238,7 @@ public class SpillableMapImplTest Assert.assertEquals("5", map.get("e")); Assert.assertEquals("6", map.get("f")); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, null); + multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f"}, ID1, new String[]{"1", null, null, null, null, null}); map.endWindow(); store.endWindow(); @@ -253,13 +250,7 @@ public class SpillableMapImplTest store.beginWindow(windowId); map.beginWindow(windowId); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); - SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{"1", null, null, "4", "5", "6", null}); map.remove("a"); map.remove("d"); @@ -271,13 +262,7 @@ public class SpillableMapImplTest Assert.assertEquals("6", map.get("f")); Assert.assertEquals(null, map.get("g")); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4"); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); - SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{"1", null, null, "4", "5", "6", null}); map.endWindow(); store.endWindow(); @@ -289,13 +274,7 @@ public class SpillableMapImplTest store.beginWindow(windowId); map.beginWindow(windowId); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "d", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5"); - SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6"); - SpillableTestUtils.checkValue(store, 0L, "g", ID1, null); + multiValueCheck(new String[]{"a", "b", "c", "d", "e", "f", "g"}, ID1, new String[]{null, null, null, null, "5", "6", null}); map.endWindow(); store.endWindow(); @@ -308,29 +287,21 @@ public class SpillableMapImplTest } @Test - public void multiMapPerBucketTest() - { - InMemSpillableStateStore store = new InMemSpillableStateStore(); - - multiMapPerBucketTestHelper(store); - } - - @Test - public void multiMapPerBucketManagedStateTest() - { - multiMapPerBucketTestHelper(testMeta.store); - } - - public void multiMapPerBucketTestHelper(SpillableStateStore store) + @Parameters({"InMem","ManagedState","TimeUnifiedManagedState"}) + public void multiMapPerBucketTest(String opt) { + setup(opt); StringSerde sss = new StringSerde(); - SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(store, ID1, 0L, - new StringSerde(), - new StringSerde()); - SpillableMapImpl<String, String> map2 = new SpillableMapImpl<>(store, ID2, 0L, - new StringSerde(), - new StringSerde()); + SpillableMapImpl<String, String> map1 = null; + SpillableMapImpl<String, String> map2 = null; + if (te == null) { + map1 = new SpillableMapImpl<>(store, ID1, 0L, sss, sss); + map2 = new SpillableMapImpl<>(store, ID2, 0L, sss, sss); + } else { + map1 = new SpillableMapImpl<>(store, ID1, sss, sss, te); + map2 = new SpillableMapImpl<>(store, ID2, sss, sss, te); + } store.setup(testMeta.operatorContext); map1.setup(testMeta.operatorContext); @@ -372,12 +343,9 @@ public class SpillableMapImplTest map1.beginWindow(windowId); map2.beginWindow(windowId); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2"); + multiValueCheck(new String[]{"a", "b"}, ID1, new String[]{"1", "2"}); - SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1"); - SpillableTestUtils.checkValue(store, 0L, "b", ID2, null); - SpillableTestUtils.checkValue(store, 0L, "c", ID2, "3"); + multiValueCheck(new String[]{"a", "b", "c"}, ID2, new String[]{"a1", null, "3"}); map1.remove("a"); @@ -395,8 +363,8 @@ public class SpillableMapImplTest map1.beginWindow(windowId); map2.beginWindow(windowId); - SpillableTestUtils.checkValue(store, 0L, "a", ID1, null); - SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1"); + multiValueCheck(new String[]{"a"}, ID1, new String[]{null}); + multiValueCheck(new String[]{"a"}, ID2, new String[]{"a1"}); map1.endWindow(); map2.endWindow(); @@ -410,18 +378,22 @@ public class SpillableMapImplTest } @Test - public void recoveryWithManagedStateTest() throws Exception + @Parameters({"ManagedState","TimeUnifiedManagedState"}) + public void recoveryWithManagedStateTest(String opt) throws Exception { + setup(opt); StringSerde sss = new StringSerde(); + SpillableMapImpl<String, String> map1 = null; + if (te == null) { + map1 = new SpillableMapImpl<>(store, ID1, 0L, sss, sss); + } else { + map1 = new SpillableMapImpl<>(store, ID1, sss, sss, te); + } - SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(testMeta.store, ID1, 0L, - new StringSerde(), - new StringSerde()); - - testMeta.store.setup(testMeta.operatorContext); + store.setup(testMeta.operatorContext); map1.setup(testMeta.operatorContext); - testMeta.store.beginWindow(0); + store.beginWindow(0); map1.beginWindow(0); map1.put("x", "1"); map1.put("y", "2"); @@ -429,9 +401,9 @@ public class SpillableMapImplTest map1.put("zz", "33"); Assert.assertEquals(4, map1.size()); map1.endWindow(); - testMeta.store.endWindow(); + store.endWindow(); - testMeta.store.beginWindow(1); + store.beginWindow(1); map1.beginWindow(1); Assert.assertEquals(4, map1.size()); map1.put("x", "4"); @@ -439,13 +411,13 @@ public class SpillableMapImplTest map1.remove("zz"); Assert.assertEquals(3, map1.size()); map1.endWindow(); - testMeta.store.endWindow(); - testMeta.store.beforeCheckpoint(1); - testMeta.store.checkpointed(1); + store.endWindow(); + store.beforeCheckpoint(1); + store.checkpointed(1); SpillableMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1); - testMeta.store.beginWindow(2); + store.beginWindow(2); map1.beginWindow(2); Assert.assertEquals(3, map1.size()); map1.put("x", "6"); @@ -453,11 +425,11 @@ public class SpillableMapImplTest map1.put("w", "8"); Assert.assertEquals(4, map1.size()); map1.endWindow(); - testMeta.store.endWindow(); + store.endWindow(); // simulating crash here map1.teardown(); - testMeta.store.teardown(); + store.teardown(); Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java index d0343e1..3f078cf 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java @@ -25,6 +25,7 @@ import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; import org.apache.apex.malhar.lib.utils.serde.StringSerde; @@ -37,24 +38,38 @@ public class SpillableSetImplTest @Rule public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + public TimeExtractor<String> te = null; + @Test - public void simpleAddGetAndSetTest1() + public void simpleAddGetAndSetTest() { InMemSpillableStateStore store = new InMemSpillableStateStore(); - simpleAddGetAndSetTest1Helper(store); + simpleAddGetAndSetTestHelper(store); + } + + @Test + public void simpleAddGetAndSetTimeUnifiedManagedStateTest() + { + te = new TestStringTimeExtractor(); + simpleAddGetAndSetTestHelper(testMeta.timeStore); } @Test - public void simpleAddGetAndSetManagedStateTest1() + public void simpleAddGetAndSetManagedStateTest() { - simpleAddGetAndSetTest1Helper(testMeta.store); + simpleAddGetAndSetTestHelper(testMeta.store); } - public void simpleAddGetAndSetTest1Helper(SpillableStateStore store) + public void simpleAddGetAndSetTestHelper(SpillableStateStore store) { - SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new StringSerde()); + SpillableSetImpl<String> set; + if (te == null) { + set = new SpillableSetImpl<>(0L, ID1, store, new StringSerde()); + } else { + set = new SpillableSetImpl<>(ID1, store, new StringSerde(), te); + } store.setup(testMeta.operatorContext); set.setup(testMeta.operatorContext); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java index 2f80628..bc1783c 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java @@ -26,6 +26,7 @@ import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.utils.serde.StringSerde; @@ -46,6 +47,8 @@ public class SpillableSetMultimapImplTest @Rule public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + public TimeExtractor<String> te = null; + @Test public void simpleMultiKeyTest() { @@ -60,10 +63,22 @@ public class SpillableSetMultimapImplTest simpleMultiKeyTestHelper(testMeta.store); } + @Test + public void simpleMultiKeyTimeUnifiedManagedStateTest() + { + te = new TestStringTimeExtractor(); + simpleMultiKeyTestHelper(testMeta.timeStore); + } + + public void simpleMultiKeyTestHelper(SpillableStateStore store) { - SpillableSetMultimapImpl<String, String> map = - new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde()); + SpillableSetMultimapImpl<String, String> map = null; + if (te == null) { + map = new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde()); + } else { + map = new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde(), te); + } store.setup(testMeta.operatorContext); map.setup(testMeta.operatorContext); @@ -296,7 +311,7 @@ public class SpillableSetMultimapImplTest store.endWindow(); } - protected Serde<String> createStringSerde() + private Serde<String> createStringSerde() { return new StringSerde(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java index d72b1f9..a312f04 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java @@ -27,6 +27,7 @@ import org.junit.runner.Description; import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils; import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore; +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedTimeUnifiedStateSpillableStateStore; import org.apache.apex.malhar.lib.utils.serde.CollectionSerde; import org.apache.apex.malhar.lib.utils.serde.Serde; import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer; @@ -57,6 +58,7 @@ public class SpillableTestUtils public static class TestMeta extends TestWatcher { public ManagedStateSpillableStateStore store; + public ManagedTimeUnifiedStateSpillableStateStore timeStore; public Context.OperatorContext operatorContext; public String applicationPath; @@ -65,8 +67,10 @@ public class SpillableTestUtils { TestUtils.deleteTargetTestClassFolder(description); store = new ManagedStateSpillableStateStore(); + timeStore = new ManagedTimeUnifiedStateSpillableStateStore(); applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); ((FileAccessFSImpl)store.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data"); + ((FileAccessFSImpl)timeStore.getFileAccess()).setBasePath(applicationPath + "/" + "time_bucket_data"); operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java new file mode 100644 index 0000000..438555f --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TestStringTimeExtractor.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.apex.malhar.lib.state.managed.TimeExtractor; + +/** + * A TimeExtractor for Tests + * Get the time value from ASCII code of the first character + */ +public class TestStringTimeExtractor implements TimeExtractor<String> +{ + static long BASETIME = System.currentTimeMillis(); + @Override + public long getTime(String s) + { + return s.toCharArray()[0] * 1000 + BASETIME - 7200000; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java index a44e454..afc5227 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java @@ -38,14 +38,16 @@ public class SpillableWindowedStorageTest @Rule public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + public static long BASETIME = System.currentTimeMillis(); + @Test public void testWindowedPlainStorage() { - SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store); + SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore); SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>(); - Window window1 = new Window.TimeWindow<>(1000, 10); - Window window2 = new Window.TimeWindow<>(1010, 10); - Window window3 = new Window.TimeWindow<>(1020, 10); + Window window1 = new Window.TimeWindow<>(BASETIME + 1000, 10); + Window window2 = new Window.TimeWindow<>(BASETIME + 1010, 10); + Window window3 = new Window.TimeWindow<>(BASETIME + 1020, 10); storage.setSpillableComplexComponent(sccImpl); /* @@ -103,11 +105,11 @@ public class SpillableWindowedStorageTest @Test public void testWindowedKeyedStorage() { - SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.store); + SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore); SpillableWindowedKeyedStorage<String, Integer> storage = new SpillableWindowedKeyedStorage<>(); - Window window1 = new Window.TimeWindow<>(1000, 10); - Window window2 = new Window.TimeWindow<>(1010, 10); - Window window3 = new Window.TimeWindow<>(1020, 10); + Window window1 = new Window.TimeWindow<>(BASETIME + 1000, 10); + Window window2 = new Window.TimeWindow<>(BASETIME + 1010, 10); + Window window3 = new Window.TimeWindow<>(BASETIME + 1020, 10); storage.setSpillableComplexComponent(sccImpl); /* @@ -118,7 +120,6 @@ public class SpillableWindowedStorageTest storage.setup(testMeta.operatorContext); storage.getSpillableComplexComponent().setup(testMeta.operatorContext); - sccImpl.beginWindow(1000); storage.put(window1, "x", 1); storage.put(window2, "x", 2); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/16edf306/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java index 4a1cef0..f898e2d 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/WindowedOperatorTest.java @@ -57,6 +57,9 @@ import com.datatorrent.lib.util.KeyValPair; @RunWith(Parameterized.class) public class WindowedOperatorTest { + + public static final long BASE = (System.currentTimeMillis() / 1000) * 1000; + @Parameterized.Parameters public static Collection<Object[]> testParameters() { @@ -90,7 +93,7 @@ public class WindowedOperatorTest { WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = new WindowedOperatorImpl<>(); if (useSpillable) { - sccImpl = new SpillableComplexComponentImpl(testMeta.store); + sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore); // TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys. windowStateStorage = new InMemoryWindowedStorage<>(); SpillableWindowedPlainStorage<MutableLong> pds = new SpillableWindowedPlainStorage<>(); @@ -116,7 +119,7 @@ public class WindowedOperatorTest { KeyedWindowedOperatorImpl<String, Long, MutableLong, Long> windowedOperator = new KeyedWindowedOperatorImpl<>(); if (useSpillable) { - sccImpl = new SpillableComplexComponentImpl(testMeta.store); + sccImpl = new SpillableComplexComponentImpl(testMeta.timeStore); // TODO: We don't yet support Spillable data structures for window state storage because SpillableMapImpl does not yet support iterating over all keys. windowStateStorage = new InMemoryWindowedStorage<>(); if (forSession) { @@ -183,7 +186,7 @@ public class WindowedOperatorTest windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L)); Assert.assertEquals("There should be exactly one window in the storage", 1, plainDataStorage.size()); Assert.assertEquals("There should be exactly one window in the storage", 1, windowStateStorage.size()); @@ -192,23 +195,22 @@ public class WindowedOperatorTest WindowState windowState = entry.getValue(); Assert.assertEquals(-1, windowState.watermarkArrivalTime); Assert.assertEquals(2L, plainDataStorage.get(window).longValue()); - - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, 3L)); Assert.assertEquals(5L, plainDataStorage.get(window).longValue()); - windowedOperator.processWatermark(new WatermarkImpl(1200)); + windowedOperator.processWatermark(new WatermarkImpl(BASE + 1200)); windowedOperator.endWindow(); Assert.assertTrue(windowState.watermarkArrivalTime >= 0); Assert.assertEquals("We should get one watermark tuple", 1, controlSink.getCount(false)); windowedOperator.beginWindow(2); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(900L, 4L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 900L, 4L)); Assert.assertEquals("Late but not too late", 9L, plainDataStorage.get(window).longValue()); - windowedOperator.processWatermark(new WatermarkImpl(3000)); + windowedOperator.processWatermark(new WatermarkImpl(BASE + 3000)); windowedOperator.endWindow(); Assert.assertEquals("We should get two watermark tuples", 2, controlSink.getCount(false)); windowedOperator.beginWindow(3); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(120L, 5L)); // this tuple should be dropped + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 120L, 5L)); // this tuple should be dropped Assert.assertEquals("The window should be dropped because it's too late", 0, plainDataStorage.size()); Assert.assertEquals("The window should be dropped because it's too late", 0, windowStateStorage.size()); windowedOperator.endWindow(); @@ -238,8 +240,8 @@ public class WindowedOperatorTest windowedOperator.output.setSink(sink); windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L)); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, 3L)); windowedOperator.endWindow(); Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); windowedOperator.beginWindow(2); @@ -251,11 +253,11 @@ public class WindowedOperatorTest Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue()); sink.collectedTuples.clear(); windowedOperator.beginWindow(4); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, 4L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 400L, 4L)); windowedOperator.endWindow(); Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); windowedOperator.beginWindow(5); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, 5L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, 5L)); windowedOperator.endWindow(); switch (accumulationMode) { case ACCUMULATING: @@ -337,8 +339,8 @@ public class WindowedOperatorTest windowedOperator.output.setSink(sink); windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L)); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, 2L)); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, 3L)); windowedOperator.endWindow(); Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); windowedOperator.beginWindow(2); @@ -376,7 +378,7 @@ public class WindowedOperatorTest WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); windowedOperator.setWindowOption(new WindowOption.GlobalWindow()); windowedOperator.setup(testMeta.operatorContext); - Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L)); + Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1100L, 2L)); Collection<? extends Window> windows = windowedValue.getWindows(); Assert.assertEquals(1, windows.size()); Assert.assertEquals(Window.GlobalWindow.INSTANCE, windows.iterator().next()); @@ -389,11 +391,11 @@ public class WindowedOperatorTest WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); windowedOperator.setup(testMeta.operatorContext); - Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1100L, 2L)); + Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1100L, 2L)); Collection<? extends Window> windows = windowedValue.getWindows(); Assert.assertEquals(1, windows.size()); Window window = windows.iterator().next(); - Assert.assertEquals(1000, window.getBeginTimestamp()); + Assert.assertEquals(BASE + 1000, window.getBeginTimestamp()); Assert.assertEquals(1000, window.getDurationMillis()); } @@ -403,19 +405,19 @@ public class WindowedOperatorTest WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createDefaultWindowedOperator(); windowedOperator.setWindowOption(new WindowOption.SlidingTimeWindows(Duration.millis(1000), Duration.millis(200))); windowedOperator.setup(testMeta.operatorContext); - Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(1600L, 2L)); + Tuple.WindowedTuple<Long> windowedValue = windowedOperator.getWindowedValue(new Tuple.TimestampedTuple<>(BASE + 1600L, 2L)); Collection<? extends Window> windows = windowedValue.getWindows(); Window[] winArray = windows.toArray(new Window[]{}); Assert.assertEquals(5, winArray.length); - Assert.assertEquals(800, winArray[0].getBeginTimestamp()); + Assert.assertEquals(BASE + 800, winArray[0].getBeginTimestamp()); Assert.assertEquals(1000, winArray[0].getDurationMillis()); - Assert.assertEquals(1000, winArray[1].getBeginTimestamp()); + Assert.assertEquals(BASE + 1000, winArray[1].getBeginTimestamp()); Assert.assertEquals(1000, winArray[1].getDurationMillis()); - Assert.assertEquals(1200, winArray[2].getBeginTimestamp()); + Assert.assertEquals(BASE + 1200, winArray[2].getBeginTimestamp()); Assert.assertEquals(1000, winArray[2].getDurationMillis()); - Assert.assertEquals(1400, winArray[3].getBeginTimestamp()); + Assert.assertEquals(BASE + 1400, winArray[3].getBeginTimestamp()); Assert.assertEquals(1000, winArray[3].getDurationMillis()); - Assert.assertEquals(1600, winArray[4].getBeginTimestamp()); + Assert.assertEquals(BASE + 1600, winArray[4].getBeginTimestamp()); Assert.assertEquals(1000, winArray[4].getDurationMillis()); windowedOperator.teardown(); } @@ -430,14 +432,14 @@ public class WindowedOperatorTest windowedOperator.output.setSink((Sink<Object>)(Sink)sink); windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); - Tuple<KeyValPair<String, Long>> tuple = new Tuple.TimestampedTuple<>(1100L, new KeyValPair<>("a", 2L)); + Tuple<KeyValPair<String, Long>> tuple = new Tuple.TimestampedTuple<>(BASE + 1100L, new KeyValPair<>("a", 2L)); windowedOperator.processTuple(tuple); Assert.assertEquals(1, sink.getCount(false)); Tuple.WindowedTuple<KeyValPair<String, Long>> out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0); Assert.assertEquals(1, out.getWindows().size()); Window.SessionWindow<String> window1 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); - Assert.assertEquals(1100L, window1.getBeginTimestamp()); + Assert.assertEquals(BASE + 1100L, window1.getBeginTimestamp()); Assert.assertEquals(2000, window1.getDurationMillis()); Assert.assertEquals("a", window1.getKey()); Assert.assertEquals("a", out.getValue().getKey()); @@ -445,7 +447,7 @@ public class WindowedOperatorTest sink.clear(); // extending an existing session window - tuple = new Tuple.TimestampedTuple<>(2000L, new KeyValPair<>("a", 3L)); + tuple = new Tuple.TimestampedTuple<>(BASE + 2000L, new KeyValPair<>("a", 3L)); windowedOperator.processTuple(tuple); Assert.assertEquals(2, sink.getCount(false)); @@ -460,27 +462,27 @@ public class WindowedOperatorTest out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(1); Window.SessionWindow<String> window2 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); - Assert.assertEquals(1100L, window2.getBeginTimestamp()); + Assert.assertEquals(BASE + 1100L, window2.getBeginTimestamp()); Assert.assertEquals(2900, window2.getDurationMillis()); Assert.assertEquals("a", out.getValue().getKey()); Assert.assertEquals(5L, out.getValue().getValue().longValue()); sink.clear(); // a separate session window - tuple = new Tuple.TimestampedTuple<>(5000L, new KeyValPair<>("a", 4L)); + tuple = new Tuple.TimestampedTuple<>(BASE + 5000L, new KeyValPair<>("a", 4L)); windowedOperator.processTuple(tuple); Assert.assertEquals(1, sink.getCount(false)); out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(0); Assert.assertEquals(1, out.getWindows().size()); Window.SessionWindow<String> window3 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); - Assert.assertEquals(5000L, window3.getBeginTimestamp()); + Assert.assertEquals(BASE + 5000L, window3.getBeginTimestamp()); Assert.assertEquals(2000, window3.getDurationMillis()); Assert.assertEquals("a", out.getValue().getKey()); Assert.assertEquals(4L, out.getValue().getValue().longValue()); sink.clear(); // session window merging - tuple = new Tuple.TimestampedTuple<>(3500L, new KeyValPair<>("a", 3L)); + tuple = new Tuple.TimestampedTuple<>(BASE + 3500L, new KeyValPair<>("a", 3L)); windowedOperator.processTuple(tuple); Assert.assertEquals(3, sink.getCount(false)); @@ -509,7 +511,7 @@ public class WindowedOperatorTest out = (Tuple.WindowedTuple<KeyValPair<String, Long>>)sink.collectedTuples.get(2); Assert.assertEquals(1, out.getWindows().size()); Window.SessionWindow<String> window4 = (Window.SessionWindow<String>)out.getWindows().iterator().next(); - Assert.assertEquals(1100L, window4.getBeginTimestamp()); + Assert.assertEquals(BASE + 1100L, window4.getBeginTimestamp()); Assert.assertEquals(5900, window4.getDurationMillis()); Assert.assertEquals("a", out.getValue().getKey()); Assert.assertEquals(12L, out.getValue().getValue().longValue()); @@ -525,14 +527,14 @@ public class WindowedOperatorTest windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000))); windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L))); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("a", 3L))); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("b", 4L))); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(150L, new KeyValPair<>("b", 5L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, new KeyValPair<>("a", 2L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, new KeyValPair<>("a", 3L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("b", 4L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 150L, new KeyValPair<>("b", 5L))); windowedOperator.endWindow(); Assert.assertEquals(1, keyedDataStorage.size()); - Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "a").longValue()); - Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(0, 1000), "b").longValue()); + Assert.assertEquals(5L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "a").longValue()); + Assert.assertEquals(9L, keyedDataStorage.get(new Window.TimeWindow(BASE, 1000), "b").longValue()); windowedOperator.teardown(); } @@ -559,10 +561,10 @@ public class WindowedOperatorTest windowedOperator.output.setSink((Sink<Object>)(Sink)sink); windowedOperator.setup(testMeta.operatorContext); windowedOperator.beginWindow(1); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, new KeyValPair<>("a", 2L))); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, new KeyValPair<>("b", 3L))); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, new KeyValPair<>("b", 5L))); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("a", 4L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 100L, new KeyValPair<>("a", 2L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 200L, new KeyValPair<>("b", 3L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 400L, new KeyValPair<>("b", 5L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("a", 4L))); windowedOperator.endWindow(); Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); windowedOperator.beginWindow(2); @@ -581,11 +583,11 @@ public class WindowedOperatorTest } sink.collectedTuples.clear(); windowedOperator.beginWindow(4); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, new KeyValPair<>("a", 8L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 400L, new KeyValPair<>("a", 8L))); windowedOperator.endWindow(); Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty()); windowedOperator.beginWindow(5); - windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, new KeyValPair<>("b", 9L))); + windowedOperator.processTuple(new Tuple.TimestampedTuple<>(BASE + 300L, new KeyValPair<>("b", 9L))); windowedOperator.endWindow(); Map<String, Long> map = new HashMap<>(); switch (accumulationMode) {
