APEXMALHAR-2267 #resolve renamed spillable data structures to remove the word "Byte"
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f5f1943d Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f5f1943d Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f5f1943d Branch: refs/heads/master Commit: f5f1943d2bcc2b61240fab649828c1aaf520d22b Parents: c19c80d Author: David Yan <[email protected]> Authored: Fri Sep 23 12:46:20 2016 -0700 Committer: David Yan <[email protected]> Committed: Tue Sep 27 14:26:46 2016 -0700 ---------------------------------------------------------------------- .../lib/join/AbstractInnerJoinOperator.java | 12 +- .../AbstractManagedStateInnerJoinOperator.java | 4 +- .../managed/ManagedTimeStateMultiValue.java | 4 +- .../malhar/lib/state/spillable/Spillable.java | 21 +- .../state/spillable/SpillableArrayListImpl.java | 6 +- .../SpillableArrayListMultimapImpl.java | 310 ++++++++++++ .../SpillableByteArrayListMultimapImpl.java | 310 ------------ .../state/spillable/SpillableByteMapImpl.java | 237 --------- .../spillable/SpillableComplexComponent.java | 86 ++-- .../SpillableComplexComponentImpl.java | 24 +- .../lib/state/spillable/SpillableMapImpl.java | 237 +++++++++ .../lib/state/spillable/SpillableSetImpl.java | 4 +- .../spillable/SpillableSetMultimapImpl.java | 4 +- .../impl/SpillableWindowedKeyedStorage.java | 4 +- .../impl/SpillableWindowedPlainStorage.java | 4 +- .../SpillableArrayListMultimapImplTest.java | 370 ++++++++++++++ .../SpillableByteArrayListMultimapImplTest.java | 371 -------------- .../spillable/SpillableByteMapImplTest.java | 484 ------------------- .../SpillableComplexComponentImplTest.java | 2 +- .../state/spillable/SpillableMapImplTest.java | 484 +++++++++++++++++++ .../spillable/SpillableSetMultimapImplTest.java | 3 +- .../malhar/lib/window/WindowedOperatorTest.java | 4 +- 22 files changed, 1495 insertions(+), 1490 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java index 816ca58..c1ebdd5 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java @@ -88,8 +88,8 @@ public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator private boolean isLeftKeyPrimary = false; private boolean isRightKeyPrimary = false; protected SpillableComplexComponent component; - protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data; - protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data; + protected Spillable.SpillableListMultimap<K,T> stream1Data; + protected Spillable.SpillableListMultimap<K,T> stream2Data; /** * Process the tuple which are received from input ports with the following steps: @@ -103,12 +103,12 @@ public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator */ protected void processTuple(T tuple, boolean isStream1Data) { - Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; + Spillable.SpillableListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; K key = extractKey(tuple,isStream1Data); if (!store.put(key, tuple)) { return; } - Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data; + Spillable.SpillableListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data; joinStream(tuple,isStream1Data, valuestore.get(key)); } @@ -210,8 +210,8 @@ public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator */ public void createStores() { - stream1Data = component.newSpillableByteArrayListMultimap(0,null,null); - stream2Data = component.newSpillableByteArrayListMultimap(0,null,null); + stream1Data = component.newSpillableArrayListMultimap(0,null,null); + stream2Data = component.newSpillableArrayListMultimap(0,null,null); } /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java index 8b19ebc..c82c3e3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java @@ -93,13 +93,13 @@ public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends Abstrac @Override protected void processTuple(T tuple, boolean isStream1Data) { - Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; + Spillable.SpillableListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data; K key = extractKey(tuple,isStream1Data); long timeBucket = extractTime(tuple,isStream1Data); if (!((ManagedTimeStateMultiValue)store).put(key, tuple,timeBucket)) { return; } - Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data; + Spillable.SpillableListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data; Future<List> future = ((ManagedTimeStateMultiValue)valuestore).getAsync(key); if (future.isDone()) { try { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java index 3ca43a4..beeeb4e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java @@ -42,7 +42,7 @@ import com.datatorrent.lib.codec.KryoSerializableStreamCodec; import com.datatorrent.netlet.util.Slice; /** - * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator. + * Concrete implementation of SpillableListMultimap which is needed for join operator. * * <b>Properties:</b><br> * <b>isKeyContainsMultiValue</b>: Specifies whether the key has multiple value or not. <br> @@ -52,7 +52,7 @@ import com.datatorrent.netlet.util.Slice; * @since 3.5.0 */ @org.apache.hadoop.classification.InterfaceStability.Evolving -public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableByteArrayListMultimap<K,V> +public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListMultimap<K,V> { private transient StreamCodec streamCodec = null; private boolean isKeyContainsMultiValue = false; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java index 849389b..6b765a8 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java @@ -38,13 +38,13 @@ import com.datatorrent.api.Context.OperatorContext; public interface Spillable { /** - * This represents a spillable {@link java.util.List}. The underlying implementation - * of this list is similar to that of an {@link java.util.ArrayList}. Users that receive an + * This represents a spillable {@link java.util.List}. Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. - * @param <T> The type of the data stored in the {@link SpillableArrayList}. + * + * @param <T> The type of the data stored in the {@link SpillableList}. */ - interface SpillableArrayList<T> extends List<T> + interface SpillableList<T> extends List<T> { } @@ -52,6 +52,7 @@ public interface Spillable * This represents a spillable {@link java.util.Set}. Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. + * * @param <T> The type of the data stored in the {@link SpillableSet}. */ interface SpillableSet<T> extends Set<T> @@ -64,10 +65,11 @@ public interface Spillable * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. + * * @param <K> The type of the keys. * @param <V> The type of the values. */ - interface SpillableByteMap<K, V> extends Map<K, V> + interface SpillableMap<K, V> extends Map<K, V> { } @@ -77,10 +79,11 @@ public interface Spillable * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. + * * @param <K> The type of the keys. * @param <V> The type of the values. */ - interface SpillableByteArrayListMultimap<K, V> extends ListMultimap<K, V> + interface SpillableListMultimap<K, V> extends ListMultimap<K, V> { } @@ -90,6 +93,7 @@ public interface Spillable * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. + * * @param <K> The type of the keys. * @param <V> The type of the values. */ @@ -102,8 +106,10 @@ public interface Spillable * some assumptions about serialization and equality. Consider two elements T1 and T2. The assumption is * that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs to the data structure. + * + * @param <T> The type of the data stored in the set. */ - interface SpillableByteMultiset<T> extends Multiset<T> + interface SpillableMultiset<T> extends Multiset<T> { } @@ -111,6 +117,7 @@ public interface Spillable * This represents a spillable {@link java.util.Queue} implementation. Users that receive an * implementation of this interface don't need to worry about propagating operator call-backs * to the data structure. + * * @param <T> The type of the data stored in the queue. */ interface SpillableQueue<T> extends Queue<T> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java index 4ea1923..a59872c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java @@ -47,7 +47,7 @@ import com.datatorrent.netlet.util.Slice; */ @DefaultSerializer(FieldSerializer.class) @InterfaceStability.Evolving -public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T>, Spillable.SpillableComponent +public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Spillable.SpillableComponent { public static final int DEFAULT_BATCH_SIZE = 1000; @@ -60,7 +60,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T @NotNull private Serde<T, Slice> serde; @NotNull - private SpillableByteMapImpl<Integer, List<T>> map; + private SpillableMapImpl<Integer, List<T>> map; private boolean sizeCached = false; private int size; @@ -93,7 +93,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T this.store = Preconditions.checkNotNull(store); this.serde = Preconditions.checkNotNull(serde); - map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), + map = new SpillableMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), new SerdeCollectionSlice<>(serde, (Class<List<T>>)(Class)ArrayList.class)); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java new file mode 100644 index 0000000..0944583 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}. + * + * @since 3.5.0 + */ +@DefaultSerializer(FieldSerializer.class) [email protected] +public class SpillableArrayListMultimapImpl<K, V> implements Spillable.SpillableListMultimap<K, V>, + Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>(); + private transient boolean isRunning = false; + private transient boolean isInWindow = false; + + private int batchSize = DEFAULT_BATCH_SIZE; + @NotNull + private SpillableMapImpl<Slice, Integer> map; + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde<K, Slice> serdeKey; + private Serde<V, Slice> serdeValue; + + private SpillableArrayListMultimapImpl() + { + // for kryo + } + + /** + * Creates a {@link SpillableArrayListMultimapImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param identifier The Id of this {@link SpillableArrayListMultimapImpl}. + * @param bucket The Id of the bucket used to store this + * {@link SpillableArrayListMultimapImpl} in the provided {@link SpillableStateStore}. + * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. + * @param serdeKey The {@link Serde} to use when serializing and deserializing values. + */ + public SpillableArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { + this.store = Preconditions.checkNotNull(store); + this.identifier = Preconditions.checkNotNull(identifier); + this.bucket = bucket; + this.serdeKey = Preconditions.checkNotNull(serdeKey); + this.serdeValue = Preconditions.checkNotNull(serdeValue); + + map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice()); + } + + public SpillableStateStore getStore() + { + return store; + } + + @Override + public List<V> get(@Nullable K key) + { + return getHelper(key); + } + + private SpillableArrayListImpl<V> getHelper(@Nullable K key) + { + SpillableArrayListImpl<V> spillableArrayList = cache.get(key); + + if (spillableArrayList == null) { + Slice keySlice = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX)); + + if (size == null) { + return null; + } + + Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice); + spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue); + spillableArrayList.setSize(size); + } + + cache.put(key, spillableArrayList); + + return spillableArrayList; + } + + @Override + public Set<K> keySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public Multiset<K> keys() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<V> values() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<Map.Entry<K, V>> entries() + { + throw new UnsupportedOperationException(); + } + + @Override + public List<V> removeAll(@Nullable Object key) + { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + // TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys + return map.size(); + } + + @Override + public boolean isEmpty() + { + return map.isEmpty(); + } + + @Override + public boolean containsKey(@Nullable Object key) + { + return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), + SIZE_KEY_SUFFIX)); + } + + @Override + public boolean containsValue(@Nullable Object value) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsEntry(@Nullable Object key, @Nullable Object value) + { + SpillableArrayListImpl<V> spillableArrayList = getHelper((K)key); + if (spillableArrayList == null) { + return false; + } + for (int i = 0; i < spillableArrayList.size(); i++) { + V v = spillableArrayList.get(i); + if (v == null) { + if (value == null) { + return true; + } + } else { + if (v.equals(value)) { + return true; + } + } + } + return false; + } + + @Override + public boolean put(@Nullable K key, @Nullable V value) + { + SpillableArrayListImpl<V> spillableArrayList = getHelper(key); + + if (spillableArrayList == null) { + Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key)); + spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue); + + cache.put(key, spillableArrayList); + } + + spillableArrayList.add(value); + return true; + } + + @Override + public boolean remove(@Nullable Object key, @Nullable Object value) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean putAll(@Nullable K key, Iterable<? extends V> values) + { + boolean changed = false; + + for (V value: values) { + changed |= put(key, value); + } + + return changed; + } + + @Override + public boolean putAll(Multimap<? extends K, ? extends V> multimap) + { + boolean changed = false; + + for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) { + changed |= put(entry.getKey(), entry.getValue()); + } + + return changed; + } + + @Override + public List<V> replaceValues(K key, Iterable<? extends V> values) + { + throw new UnsupportedOperationException(); + } + + @Override + public Map<K, Collection<V>> asMap() + { + throw new UnsupportedOperationException(); + } + + @Override + public void setup(Context.OperatorContext context) + { + map.setup(context); + isRunning = true; + } + + @Override + public void beginWindow(long windowId) + { + map.beginWindow(windowId); + isInWindow = true; + } + + @Override + public void endWindow() + { + isInWindow = false; + for (K key: cache.getChangedKeys()) { + + SpillableArrayListImpl<V> spillableArrayList = cache.get(key); + spillableArrayList.endWindow(); + + Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX), + spillableArrayList.size()); + } + + Preconditions.checkState(cache.getRemovedKeys().isEmpty()); + cache.endWindow(); + map.endWindow(); + } + + @Override + public void teardown() + { + isRunning = false; + map.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java deleted file mode 100644 index c0466bd..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java +++ /dev/null @@ -1,310 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.state.spillable; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.annotation.Nullable; -import javax.validation.constraints.NotNull; - -import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde; -import org.apache.apex.malhar.lib.utils.serde.Serde; -import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; -import org.apache.apex.malhar.lib.utils.serde.SliceUtils; -import org.apache.hadoop.classification.InterfaceStability; - -import com.esotericsoftware.kryo.DefaultSerializer; -import com.esotericsoftware.kryo.serializers.FieldSerializer; -import com.google.common.base.Preconditions; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multiset; - -import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.Slice; - -/** - * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}. - * - * @since 3.5.0 - */ -@DefaultSerializer(FieldSerializer.class) [email protected] -public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>, - Spillable.SpillableComponent -{ - public static final int DEFAULT_BATCH_SIZE = 1000; - public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; - - private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>(); - private transient boolean isRunning = false; - private transient boolean isInWindow = false; - - private int batchSize = DEFAULT_BATCH_SIZE; - @NotNull - private SpillableByteMapImpl<Slice, Integer> map; - private SpillableStateStore store; - private byte[] identifier; - private long bucket; - private Serde<K, Slice> serdeKey; - private Serde<V, Slice> serdeValue; - - private SpillableByteArrayListMultimapImpl() - { - // for kryo - } - - /** - * Creates a {@link SpillableByteArrayListMultimapImpl}. - * @param store The {@link SpillableStateStore} in which to spill to. - * @param identifier The Id of this {@link SpillableByteArrayListMultimapImpl}. - * @param bucket The Id of the bucket used to store this - * {@link SpillableByteArrayListMultimapImpl} in the provided {@link SpillableStateStore}. - * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. - * @param serdeKey The {@link Serde} to use when serializing and deserializing values. - */ - public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, - Serde<K, Slice> serdeKey, - Serde<V, Slice> serdeValue) - { - this.store = Preconditions.checkNotNull(store); - this.identifier = Preconditions.checkNotNull(identifier); - this.bucket = bucket; - this.serdeKey = Preconditions.checkNotNull(serdeKey); - this.serdeValue = Preconditions.checkNotNull(serdeValue); - - map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice()); - } - - public SpillableStateStore getStore() - { - return store; - } - - @Override - public List<V> get(@Nullable K key) - { - return getHelper(key); - } - - private SpillableArrayListImpl<V> getHelper(@Nullable K key) - { - SpillableArrayListImpl<V> spillableArrayList = cache.get(key); - - if (spillableArrayList == null) { - Slice keySlice = serdeKey.serialize(key); - Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX)); - - if (size == null) { - return null; - } - - Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice); - spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue); - spillableArrayList.setSize(size); - } - - cache.put(key, spillableArrayList); - - return spillableArrayList; - } - - @Override - public Set<K> keySet() - { - throw new UnsupportedOperationException(); - } - - @Override - public Multiset<K> keys() - { - throw new UnsupportedOperationException(); - } - - @Override - public Collection<V> values() - { - throw new UnsupportedOperationException(); - } - - @Override - public Collection<Map.Entry<K, V>> entries() - { - throw new UnsupportedOperationException(); - } - - @Override - public List<V> removeAll(@Nullable Object key) - { - throw new UnsupportedOperationException(); - } - - @Override - public void clear() - { - throw new UnsupportedOperationException(); - } - - @Override - public int size() - { - // TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys - return map.size(); - } - - @Override - public boolean isEmpty() - { - return map.isEmpty(); - } - - @Override - public boolean containsKey(@Nullable Object key) - { - return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), - SIZE_KEY_SUFFIX)); - } - - @Override - public boolean containsValue(@Nullable Object value) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean containsEntry(@Nullable Object key, @Nullable Object value) - { - SpillableArrayListImpl<V> spillableArrayList = getHelper((K)key); - if (spillableArrayList == null) { - return false; - } - for (int i = 0; i < spillableArrayList.size(); i++) { - V v = spillableArrayList.get(i); - if (v == null) { - if (value == null) { - return true; - } - } else { - if (v.equals(value)) { - return true; - } - } - } - return false; - } - - @Override - public boolean put(@Nullable K key, @Nullable V value) - { - SpillableArrayListImpl<V> spillableArrayList = getHelper(key); - - if (spillableArrayList == null) { - Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key)); - spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue); - - cache.put(key, spillableArrayList); - } - - spillableArrayList.add(value); - return true; - } - - @Override - public boolean remove(@Nullable Object key, @Nullable Object value) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean putAll(@Nullable K key, Iterable<? extends V> values) - { - boolean changed = false; - - for (V value: values) { - changed |= put(key, value); - } - - return changed; - } - - @Override - public boolean putAll(Multimap<? extends K, ? extends V> multimap) - { - boolean changed = false; - - for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) { - changed |= put(entry.getKey(), entry.getValue()); - } - - return changed; - } - - @Override - public List<V> replaceValues(K key, Iterable<? extends V> values) - { - throw new UnsupportedOperationException(); - } - - @Override - public Map<K, Collection<V>> asMap() - { - throw new UnsupportedOperationException(); - } - - @Override - public void setup(Context.OperatorContext context) - { - map.setup(context); - isRunning = true; - } - - @Override - public void beginWindow(long windowId) - { - map.beginWindow(windowId); - isInWindow = true; - } - - @Override - public void endWindow() - { - isInWindow = false; - for (K key: cache.getChangedKeys()) { - - SpillableArrayListImpl<V> spillableArrayList = cache.get(key); - spillableArrayList.endWindow(); - - Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX), - spillableArrayList.size()); - } - - Preconditions.checkState(cache.getRemovedKeys().isEmpty()); - cache.endWindow(); - map.endWindow(); - } - - @Override - public void teardown() - { - isRunning = false; - map.teardown(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java deleted file mode 100644 index f36f2dc..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.state.spillable; - -import java.io.Serializable; -import java.util.Collection; -import java.util.Map; -import java.util.Set; - -import javax.validation.constraints.NotNull; - -import org.apache.apex.malhar.lib.state.BucketedState; -import org.apache.apex.malhar.lib.utils.serde.Serde; -import org.apache.apex.malhar.lib.utils.serde.SliceUtils; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.mutable.MutableInt; -import org.apache.hadoop.classification.InterfaceStability; - -import com.esotericsoftware.kryo.DefaultSerializer; -import com.esotericsoftware.kryo.serializers.FieldSerializer; -import com.google.common.base.Preconditions; - -import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.Slice; - -/** - * A Spillable implementation of {@link Map} - * @param <K> The types of keys. - * @param <V> The types of values. - * - * @since 3.5.0 - */ -@DefaultSerializer(FieldSerializer.class) [email protected] -public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent, - Serializable -{ - private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>(); - private transient MutableInt tempOffset = new MutableInt(); - - @NotNull - private SpillableStateStore store; - @NotNull - private byte[] identifier; - private long bucket; - @NotNull - private Serde<K, Slice> serdeKey; - @NotNull - private Serde<V, Slice> serdeValue; - - private int size = 0; - - private SpillableByteMapImpl() - { - //for kryo - } - - /** - * Creats a {@link SpillableByteMapImpl}. - * @param store The {@link SpillableStateStore} in which to spill to. - * @param identifier The Id of this {@link SpillableByteMapImpl}. - * @param bucket The Id of the bucket used to store this - * {@link SpillableByteMapImpl} in the provided {@link SpillableStateStore}. - * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. - * @param serdeKey The {@link Serde} to use when serializing and deserializing values. - */ - public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey, - Serde<V, Slice> serdeValue) - { - this.store = Preconditions.checkNotNull(store); - this.identifier = Preconditions.checkNotNull(identifier); - this.bucket = bucket; - this.serdeKey = Preconditions.checkNotNull(serdeKey); - this.serdeValue = Preconditions.checkNotNull(serdeValue); - } - - public SpillableStateStore getStore() - { - return this.store; - } - - @Override - public int size() - { - return size; - } - - @Override - public boolean isEmpty() - { - return size == 0; - } - - @Override - public boolean containsKey(Object o) - { - return get(o) != null; - } - - @Override - public boolean containsValue(Object o) - { - throw new UnsupportedOperationException(); - } - - @Override - public V get(Object o) - { - K key = (K)o; - - if (cache.getRemovedKeys().contains(key)) { - return null; - } - - V val = cache.get(key); - - if (val != null) { - return val; - } - - Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key))); - - if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) { - return null; - } - - tempOffset.setValue(0); - return serdeValue.deserialize(valSlice, tempOffset); - } - - @Override - public V put(K k, V v) - { - V value = get(k); - - if (value == null) { - size++; - } - - cache.put(k, v); - - return value; - } - - @Override - public V remove(Object o) - { - V value = get(o); - - if (value != null) { - size--; - } - - cache.remove((K)o); - - return value; - } - - @Override - public void putAll(Map<? extends K, ? extends V> map) - { - for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) { - put(entry.getKey(), entry.getValue()); - } - } - - @Override - public void clear() - { - throw new UnsupportedOperationException(); - } - - @Override - public Set<K> keySet() - { - throw new UnsupportedOperationException(); - } - - @Override - public Collection<V> values() - { - throw new UnsupportedOperationException(); - } - - @Override - public Set<Entry<K, V>> entrySet() - { - throw new UnsupportedOperationException(); - } - - @Override - public void setup(Context.OperatorContext context) - { - } - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - for (K key: cache.getChangedKeys()) { - store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)), - serdeValue.serialize(cache.get(key))); - } - - for (K key: cache.getRemovedKeys()) { - store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)), - new Slice(ArrayUtils.EMPTY_BYTE_ARRAY)); - } - - cache.endWindow(); - } - - @Override - public void teardown() - { - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java index e4836c4..c4462d5 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java @@ -36,75 +36,75 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S Operator.CheckpointNotificationListener { /** - * This is a method for creating a {@link SpillableArrayList}. This method + * This is a method for creating a {@link SpillableList}. This method * auto-generates an identifier for the data structure. - * @param <T> The type of data stored in the {@link SpillableArrayList}. - * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to. - * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}. - * @return A {@link SpillableArrayList}. + * @param <T> The type of data stored in the {@link SpillableList}. + * @param bucket The bucket that this {@link SpillableList} will be spilled to. + * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}. + * @return A {@link SpillableList}. */ - <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde); + <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde); /** - * This is a method for creating a {@link SpillableArrayList}. - * @param <T> The type of data stored in the {@link SpillableArrayList}. - * @param identifier The identifier for this {@link SpillableArrayList}. - * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to. - * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}. - * @return A {@link SpillableArrayList}. + * This is a method for creating a {@link SpillableList}. + * @param <T> The type of data stored in the {@link SpillableList}. + * @param identifier The identifier for this {@link SpillableList}. + * @param bucket The bucket that this {@link SpillableList} will be spilled to. + * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}. + * @return A {@link SpillableList}. */ - <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde); + <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde); /** - * This is a method for creating a {@link SpillableByteMap}. This method + * This is a method for creating a {@link SpillableMap}. This method * auto-generates an identifier for the data structure. * @param <K> The type of the keys. * @param <V> The type of the values. - * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to. + * @param bucket The bucket that this {@link SpillableMap} will be spilled to. * @param serdeKey The Serializer/Deserializer to use for the map's keys. * @param serdeValue The Serializer/Deserializer to use for the map's values. - * @return A {@link SpillableByteMap}. + * @return A {@link SpillableMap}. */ - <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey, + <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue); /** - * This is a method for creating a {@link SpillableByteMap}. + * This is a method for creating a {@link SpillableMap}. * @param <K> The type of the keys. * @param <V> The type of the values. - * @param identifier The identifier for this {@link SpillableByteMap}. - * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to. + * @param identifier The identifier for this {@link SpillableMap}. + * @param bucket The bucket that this {@link SpillableMap} will be spilled to. * @param serdeKey The Serializer/Deserializer to use for the map's keys. * @param serdeValue The Serializer/Deserializer to use for the map's values. - * @return A {@link SpillableByteMap}. + * @return A {@link SpillableMap}. */ - <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket, + <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue); /** - * This is a method for creating a {@link SpillableByteArrayListMultimap}. This method + * This is a method for creating a {@link SpillableListMultimap}. This method * auto-generates an identifier for the data structure. * @param <K> The type of the keys. * @param <V> The type of the values in the map's lists. - * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to. + * @param bucket The bucket that this {@link SpillableListMultimap} will be spilled to. * @param serdeKey The Serializer/Deserializer to use for the map's keys. * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists. - * @return A {@link SpillableByteArrayListMultimap}. + * @return A {@link SpillableListMultimap}. */ - <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket, Serde<K, + <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue); /** - * This is a method for creating a {@link SpillableByteArrayListMultimap}. + * This is a method for creating a {@link SpillableListMultimap}. * @param <K> The type of the keys. * @param <V> The type of the values in the map's lists. - * @param identifier The identifier for this {@link SpillableByteArrayListMultimap}. - * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to. + * @param identifier The identifier for this {@link SpillableListMultimap}. + * @param bucket The bucket that this {@link SpillableListMultimap} will be spilled to. * @param serdeKey The Serializer/Deserializer to use for the map's keys. * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists. - * @return A {@link SpillableByteArrayListMultimap}. + * @return A {@link SpillableListMultimap}. */ - <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier, long bucket, + <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue); @@ -121,24 +121,24 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S Slice> serdeKey, Serde<V, Slice> serdeValue); /** - * This is a method for creating a {@link SpillableByteMultiset}. This method + * This is a method for creating a {@link SpillableMultiset}. This method * auto-generates an identifier for the data structure. * @param <T> The type of the elements. - * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to. - * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}. - * @return A {@link SpillableByteMultiset}. + * @param bucket The bucket that this {@link SpillableMultiset} will be spilled to. + * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}. + * @return A {@link SpillableMultiset}. */ - <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde); + <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde); /** - * This is a method for creating a {@link SpillableByteMultiset}. + * This is a method for creating a {@link SpillableMultiset}. * @param <T> The type of the elements. - * @param identifier The identifier for this {@link SpillableByteMultiset}. - * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to. - * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}. - * @return A {@link SpillableByteMultiset}. + * @param identifier The identifier for this {@link SpillableMultiset}. + * @param bucket The bucket that this {@link SpillableMultiset} will be spilled to. + * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}. + * @return A {@link SpillableMultiset}. */ - <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde); + <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde); /** * This is a method for creating a {@link SpillableQueue}. This method @@ -153,7 +153,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S /** * This is a method for creating a {@link SpillableQueue}. * @param <T> The type of the data stored in the {@link SpillableQueue}. - * @param identifier The identifier for this {@link SpillableByteArrayListMultimap}. + * @param identifier The identifier for this {@link SpillableListMultimap}. * @param bucket The bucket that this {@link SpillableQueue} will be spilled to. * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}. * @return A {@link SpillableQueue}. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java index 9c3defc..aad219d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java @@ -66,14 +66,14 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent this.identifierGenerator = Preconditions.checkNotNull(identifierGenerator); } - public <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde) + public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde) { SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde); componentList.add(list); return list; } - public <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde) + public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde) { identifierGenerator.register(identifier); SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde); @@ -81,39 +81,39 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent return list; } - public <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey, + public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue) { - SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifierGenerator.next(), + SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifierGenerator.next(), bucket, serdeKey, serdeValue); componentList.add(map); return map; } - public <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey, + public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue) { identifierGenerator.register(identifier); - SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue); + SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue); componentList.add(map); return map; } - public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket, Serde<K, + public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue) { - SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store, + SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store, identifierGenerator.next(), bucket, serdeKey, serdeValue); componentList.add(map); return map; } - public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier, long bucket, + public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue) { identifierGenerator.register(identifier); - SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store, + SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue); componentList.add(map); return map; @@ -128,12 +128,12 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent return map; } - public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde) + public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde) { throw new UnsupportedOperationException("Unsupported Operation"); } - public <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde) + public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde) { throw new UnsupportedOperationException("Unsupported Operation"); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java new file mode 100644 index 0000000..016aeec --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * A Spillable implementation of {@link Map} + * @param <K> The types of keys. + * @param <V> The types of values. + * + * @since 3.5.0 + */ +@DefaultSerializer(FieldSerializer.class) [email protected] +public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spillable.SpillableComponent, + Serializable +{ + private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>(); + private transient MutableInt tempOffset = new MutableInt(); + + @NotNull + private SpillableStateStore store; + @NotNull + private byte[] identifier; + private long bucket; + @NotNull + private Serde<K, Slice> serdeKey; + @NotNull + private Serde<V, Slice> serdeValue; + + private int size = 0; + + private SpillableMapImpl() + { + //for kryo + } + + /** + * Creats a {@link SpillableMapImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param identifier The Id of this {@link SpillableMapImpl}. + * @param bucket The Id of the bucket used to store this + * {@link SpillableMapImpl} in the provided {@link SpillableStateStore}. + * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. + * @param serdeKey The {@link Serde} to use when serializing and deserializing values. + */ + public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { + this.store = Preconditions.checkNotNull(store); + this.identifier = Preconditions.checkNotNull(identifier); + this.bucket = bucket; + this.serdeKey = Preconditions.checkNotNull(serdeKey); + this.serdeValue = Preconditions.checkNotNull(serdeValue); + } + + public SpillableStateStore getStore() + { + return this.store; + } + + @Override + public int size() + { + return size; + } + + @Override + public boolean isEmpty() + { + return size == 0; + } + + @Override + public boolean containsKey(Object o) + { + return get(o) != null; + } + + @Override + public boolean containsValue(Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public V get(Object o) + { + K key = (K)o; + + if (cache.getRemovedKeys().contains(key)) { + return null; + } + + V val = cache.get(key); + + if (val != null) { + return val; + } + + Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key))); + + if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) { + return null; + } + + tempOffset.setValue(0); + return serdeValue.deserialize(valSlice, tempOffset); + } + + @Override + public V put(K k, V v) + { + V value = get(k); + + if (value == null) { + size++; + } + + cache.put(k, v); + + return value; + } + + @Override + public V remove(Object o) + { + V value = get(o); + + if (value != null) { + size--; + } + + cache.remove((K)o); + + return value; + } + + @Override + public void putAll(Map<? extends K, ? extends V> map) + { + for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void clear() + { + throw new UnsupportedOperationException(); + } + + @Override + public Set<K> keySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<V> values() + { + throw new UnsupportedOperationException(); + } + + @Override + public Set<Entry<K, V>> entrySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public void setup(Context.OperatorContext context) + { + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + for (K key: cache.getChangedKeys()) { + store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)), + serdeValue.serialize(cache.get(key))); + } + + for (K key: cache.getRemovedKeys()) { + store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)), + new Slice(ArrayUtils.EMPTY_BYTE_ARRAY)); + } + + cache.endWindow(); + } + + @Override + public void teardown() + { + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java index 122cd2d..c2741b0 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java @@ -110,7 +110,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable @NotNull private SpillableStateStore store; @NotNull - private SpillableByteMapImpl<T, ListNode<T>> map; + private SpillableMapImpl<T, ListNode<T>> map; private T head; private int size; @@ -139,7 +139,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable { this.store = Preconditions.checkNotNull(store); - map = new SpillableByteMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde)); + map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde)); } public void setSize(int size) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java index c227ed7..98f60d2 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java @@ -61,7 +61,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul private transient WindowBoundedMapCache<K, SpillableSetImpl<V>> cache = new WindowBoundedMapCache<>(); @NotNull - private SpillableByteMapImpl<Slice, Pair<Integer, V>> map; + private SpillableMapImpl<Slice, Pair<Integer, V>> map; private SpillableStateStore store; private byte[] identifier; private long bucket; @@ -93,7 +93,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul this.serdeKey = Preconditions.checkNotNull(serdeKey); this.serdeValue = Preconditions.checkNotNull(serdeValue); - map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue)); + map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue)); } public SpillableStateStore getStore() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java index ac77d1b..ac386ab 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java @@ -53,7 +53,7 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind protected Serde<K, Slice> keySerde; protected Serde<V, Slice> valueSerde; - protected Spillable.SpillableByteMap<Pair<Window, K>, V> windowKeyToValueMap; + protected Spillable.SpillableMap<Pair<Window, K>, V> windowKeyToValueMap; protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap; private class KVIterator implements Iterator<Map.Entry<K, V>> @@ -181,7 +181,7 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind } if (windowKeyToValueMap == null) { - windowKeyToValueMap = scc.newSpillableByteMap(bucket, windowKeyPairSerde, valueSerde); + windowKeyToValueMap = scc.newSpillableMap(bucket, windowKeyPairSerde, valueSerde); } if (windowToKeysMap == null) { windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java index 81f5dbb..6666381 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java @@ -45,7 +45,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe private Serde<Window, Slice> windowSerde; private Serde<T, Slice> valueSerde; - protected Spillable.SpillableByteMap<Window, T> windowToDataMap; + protected Spillable.SpillableMap<Window, T> windowToDataMap; public SpillableWindowedPlainStorage() { @@ -134,7 +134,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe valueSerde = new SerdeKryoSlice<>(); } if (windowToDataMap == null) { - windowToDataMap = scc.newSpillableByteMap(bucket, windowSerde, valueSerde); + windowToDataMap = scc.newSpillableMap(bucket, windowSerde, valueSerde); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java new file mode 100644 index 0000000..82fb340 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.List; +import java.util.Random; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.Slice; + +public class SpillableArrayListMultimapImplTest +{ + public static final byte[] ID1 = new byte[]{(byte)0}; + + @Rule + public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); + + @Test + public void simpleMultiKeyTest() + { + InMemSpillableStateStore store = new InMemSpillableStateStore(); + + simpleMultiKeyTestHelper(store); + } + + @Test + public void simpleMultiKeyManagedStateTest() + { + simpleMultiKeyTestHelper(testMeta.store); + } + + public void simpleMultiKeyTestHelper(SpillableStateStore store) + { + SpillableArrayListMultimapImpl<String, String> map = + new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new SerdeStringSlice(), + new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long nextWindowId = 0L; + nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId); + nextWindowId++; + + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(1, map.size()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + nextWindowId = simpleMultiKeyTestHelper(store, map, "b", nextWindowId); + nextWindowId++; + + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(2, map.size()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + simpleMultiKeyTestHelper(store, map, "c", nextWindowId); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(3, map.size()); + + map.endWindow(); + store.endWindow(); + + map.teardown(); + store.teardown(); + } + + public long simpleMultiKeyTestHelper(SpillableStateStore store, + SpillableArrayListMultimapImpl<String, String> map, String key, long nextWindowId) + { + SerdeStringSlice serdeString = new SerdeStringSlice(); + SerdeIntSlice serdeInt = new SerdeIntSlice(); + + Slice keySlice = serdeString.serialize(key); + + byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray()); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertNull(map.get(key)); + + Assert.assertFalse(map.containsKey(key)); + + map.put(key, "a"); + + Assert.assertTrue(map.containsKey(key)); + + List<String> list1 = map.get(key); + Assert.assertEquals(1, list1.size()); + + Assert.assertEquals("a", list1.get(0)); + + list1.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g")); + + Assert.assertEquals(8, list1.size()); + + Assert.assertEquals("a", list1.get(0)); + Assert.assertEquals("a", list1.get(1)); + Assert.assertEquals("b", list1.get(2)); + Assert.assertEquals("c", list1.get(3)); + Assert.assertEquals("d", list1.get(4)); + Assert.assertEquals("e", list1.get(5)); + Assert.assertEquals("f", list1.get(6)); + Assert.assertEquals("g", list1.get(7)); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + SpillableTestUtils.checkValue(store, 0L, + SliceUtils.concatenate(keyBytes, SpillableArrayListMultimapImpl.SIZE_KEY_SUFFIX), 8, 0, serdeInt); + + SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e", + "f", "g")); + + List<String> list2 = map.get(key); + + Assert.assertEquals(8, list2.size()); + + Assert.assertEquals("a", list2.get(0)); + Assert.assertEquals("a", list2.get(1)); + Assert.assertEquals("b", list2.get(2)); + Assert.assertEquals("c", list2.get(3)); + Assert.assertEquals("d", list2.get(4)); + Assert.assertEquals("e", list2.get(5)); + Assert.assertEquals("f", list2.get(6)); + Assert.assertEquals("g", list2.get(7)); + + list2.add("tt"); + list2.add("ab"); + list2.add("99"); + list2.add("oo"); + + Assert.assertEquals("tt", list2.get(8)); + Assert.assertEquals("ab", list2.get(9)); + Assert.assertEquals("99", list2.get(10)); + Assert.assertEquals("oo", list2.get(11)); + + Assert.assertEquals(12, list2.size()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + Assert.assertEquals(12, list2.size()); + + SpillableTestUtils.checkValue(store, 0L, + SliceUtils.concatenate(keyBytes, SpillableArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt); + + SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e", + "f", "g", "tt", "ab", "99", "oo")); + + List<String> list3 = map.get(key); + + list3.set(1, "111"); + list3.set(3, "222"); + list3.set(5, "333"); + list3.set(11, "444"); + + Assert.assertEquals("a", list3.get(0)); + Assert.assertEquals("111", list3.get(1)); + Assert.assertEquals("b", list3.get(2)); + Assert.assertEquals("222", list3.get(3)); + Assert.assertEquals("d", list3.get(4)); + Assert.assertEquals("333", list3.get(5)); + Assert.assertEquals("f", list3.get(6)); + Assert.assertEquals("g", list3.get(7)); + Assert.assertEquals("tt", list3.get(8)); + Assert.assertEquals("ab", list3.get(9)); + Assert.assertEquals("99", list3.get(10)); + Assert.assertEquals("444", list3.get(11)); + + Assert.assertEquals(12, list2.size()); + + map.endWindow(); + store.endWindow(); + + nextWindowId++; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + SpillableTestUtils.checkValue(store, 0L, + SliceUtils.concatenate(keyBytes, SpillableArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt); + + SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d", "333", + "f", "g", "tt", "ab", "99", "444")); + + map.endWindow(); + store.endWindow(); + + return nextWindowId; + } + + @Test + public void recoveryTestWithManagedState() + { + SpillableStateStore store = testMeta.store; + + SpillableArrayListMultimapImpl<String, String> map = + new SpillableArrayListMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); + + store.setup(testMeta.operatorContext); + map.setup(testMeta.operatorContext); + + long nextWindowId = 0L; + nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId); + long activationWindow = nextWindowId; + store.beforeCheckpoint(nextWindowId); + SpillableArrayListMultimapImpl<String, String> clonedMap = KryoCloneUtils.cloneObject(map); + store.checkpointed(nextWindowId); + store.committed(nextWindowId); + + nextWindowId++; + + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + List<String> list1 = map.get("a"); + + Assert.assertEquals(12, list1.size()); + + Assert.assertEquals("a", list1.get(0)); + Assert.assertEquals("111", list1.get(1)); + Assert.assertEquals("b", list1.get(2)); + Assert.assertEquals("222", list1.get(3)); + Assert.assertEquals("d", list1.get(4)); + Assert.assertEquals("333", list1.get(5)); + Assert.assertEquals("f", list1.get(6)); + Assert.assertEquals("g", list1.get(7)); + Assert.assertEquals("tt", list1.get(8)); + Assert.assertEquals("ab", list1.get(9)); + Assert.assertEquals("99", list1.get(10)); + Assert.assertEquals("444", list1.get(11)); + + list1.add("111"); + + Assert.assertEquals("a", list1.get(0)); + Assert.assertEquals("111", list1.get(1)); + Assert.assertEquals("b", list1.get(2)); + Assert.assertEquals("222", list1.get(3)); + Assert.assertEquals("d", list1.get(4)); + Assert.assertEquals("333", list1.get(5)); + Assert.assertEquals("f", list1.get(6)); + Assert.assertEquals("g", list1.get(7)); + Assert.assertEquals("tt", list1.get(8)); + Assert.assertEquals("ab", list1.get(9)); + Assert.assertEquals("99", list1.get(10)); + Assert.assertEquals("444", list1.get(11)); + Assert.assertEquals("111", list1.get(12)); + + Assert.assertEquals(13, list1.size()); + + map.endWindow(); + store.endWindow(); + + map.teardown(); + store.teardown(); + + map = clonedMap; + store = map.getStore(); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); + attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow); + Context.OperatorContext context = + new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); + + store.setup(context); + map.setup(context); + nextWindowId = activationWindow + 1; + store.beginWindow(nextWindowId); + map.beginWindow(nextWindowId); + + SerdeStringSlice serdeString = new SerdeStringSlice(); + Slice keySlice = serdeString.serialize("a"); + byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray()); + + SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d", + "333", "f", "g", "tt", "ab", "99", "444")); + + Assert.assertEquals(1, map.size()); + Assert.assertEquals(12, map.get("a").size()); + + map.endWindow(); + store.endWindow(); + + map.teardown(); + store.teardown(); + } + + @Test + public void testLoad() + { + Random random = new Random(); + final int keySize = 1000000; + final int valueSize = 100000000; + final int numOfEntry = 100000; + + SpillableStateStore store = testMeta.store; + SpillableArrayListMultimapImpl<String, String> multimap = new SpillableArrayListMultimapImpl<>( + this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice()); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); + Context.OperatorContext context = + new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes); + store.setup(context); + multimap.setup(context); + + store.beginWindow(1); + multimap.beginWindow(1); + for (int i = 0; i < numOfEntry; ++i) { + multimap.put(String.valueOf(random.nextInt(keySize)), String.valueOf(random.nextInt(valueSize))); + } + multimap.endWindow(); + store.endWindow(); + } +}
