APEXMALHAR-2048 Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9b6e11d8 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9b6e11d8 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9b6e11d8 Branch: refs/heads/master Commit: 9b6e11d85accc88faa08d7b4a8daeb9b069fc878 Parents: d1fb2b6 Author: Timothy Farkas <[email protected]> Authored: Sun Jul 17 14:32:34 2016 -0700 Committer: David Yan <[email protected]> Committed: Mon Aug 15 12:15:21 2016 -0700 ---------------------------------------------------------------------- .../SequentialSpillableIdentifierGenerator.java | 84 +++ .../malhar/lib/state/spillable/Spillable.java | 14 +- .../state/spillable/SpillableArrayListImpl.java | 326 ++++++++++ .../SpillableByteArrayListMultimapImpl.java | 291 +++++++++ .../state/spillable/SpillableByteMapImpl.java | 235 ++++++++ .../spillable/SpillableComplexComponent.java | 4 +- .../SpillableComplexComponentImpl.java | 193 ++++++ .../spillable/SpillableIdentifierGenerator.java | 41 ++ .../state/spillable/SpillableStateStore.java | 35 ++ .../state/spillable/TimeBasedPriorityQueue.java | 154 +++++ .../state/spillable/WindowBoundedMapCache.java | 129 ++++ .../lib/state/spillable/WindowListener.java | 42 ++ .../state/spillable/inmem/InMemMultiset.java | 161 ----- .../inmem/InMemSpillableArrayList.java | 175 ------ .../InMemSpillableByteArrayListMultimap.java | 154 ----- .../inmem/InMemSpillableComplexComponent.java | 117 ---- .../inmem/InMemSpillableStateStore.java | 118 ++++ .../ManagedStateSpillableStateStore.java | 34 ++ .../lib/utils/serde/PassThruByteArraySerde.java | 2 + .../serde/PassThruByteArraySliceSerde.java | 59 ++ .../lib/utils/serde/PassThruSliceSerde.java | 50 ++ .../malhar/lib/utils/serde/SerdeIntSlice.java | 52 ++ .../malhar/lib/utils/serde/SerdeListSlice.java | 109 ++++ .../lib/utils/serde/SerdeStringSlice.java | 53 ++ .../apex/malhar/lib/utils/serde/SliceUtils.java | 101 ++++ .../com/datatorrent/lib/util/TestUtils.java | 25 + .../state/managed/ManagedStateTestUtils.java | 13 +- .../spillable/inmem/InMemMultisetTest.java | 44 -- .../inmem/InMemSpillableArrayListTest.java | 44 -- ...InMemSpillableByteArrayListMultimapTest.java | 45 -- ...uentialSpillableIdentifierGeneratorTest.java | 125 ++++ .../spillable/SpillableArrayListImplTest.java | 594 +++++++++++++++++++ .../SpillableByteArrayListMultimapImplTest.java | 341 +++++++++++ .../spillable/SpillableByteMapImplTest.java | 484 +++++++++++++++ .../SpillableComplexComponentImplTest.java | 63 ++ .../lib/state/spillable/SpillableTestUtils.java | 134 +++++ .../spillable/TimeBasedPriorityQueueTest.java | 134 +++++ .../spillable/WindowBoundedMapCacheTest.java | 116 ++++ .../inmem/InMemorySpillableStateStoreTest.java | 60 ++ .../lib/utils/serde/SerdeListSliceTest.java | 45 ++ 40 files changed, 4240 insertions(+), 760 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java new file mode 100644 index 0000000..600fa98 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; + +/** + * This is an id generator that generates single byte ids for Spillable datastructures. + */ [email protected] +public class SequentialSpillableIdentifierGenerator implements SpillableIdentifierGenerator +{ + private boolean nextCalled = false; + private boolean done = false; + private byte currentIdentifier = 0; + + private Set<Byte> registeredIdentifier = Sets.newHashSet(); + + @Override + public byte[] next() + { + Preconditions.checkState(!done); + + nextCalled = true; + + byte nextIndentifier = currentIdentifier; + seek(); + + return new byte[]{nextIndentifier}; + } + + @Override + public void register(byte[] identifierArray) + { + Preconditions.checkState(!nextCalled); + Preconditions.checkState(!done); + Preconditions.checkArgument(identifierArray.length == 1); + + byte identifier = identifierArray[0]; + + Preconditions.checkState(identifier >= currentIdentifier && + !registeredIdentifier.contains(identifier)); + + registeredIdentifier.add(identifier); + + if (currentIdentifier == identifier) { + seek(); + } + } + + private void seek() + { + if (currentIdentifier == Byte.MAX_VALUE) { + done = true; + } else { + do { + currentIdentifier++; + } while (registeredIdentifier.contains(currentIdentifier) && currentIdentifier < Byte.MAX_VALUE); + + done = currentIdentifier == Byte.MAX_VALUE && registeredIdentifier.contains(currentIdentifier); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java index 41a0efc..4c9b997 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java @@ -98,19 +98,7 @@ public interface Spillable * should implement this interface. A user working with an implementation of this interface needs * to make sure that the {@link com.datatorrent.api.Operator} call-backs are propagated to it. */ - interface SpillableComponent extends Component<OperatorContext>, Spillable + interface SpillableComponent extends Component<OperatorContext>, Spillable, WindowListener { - /** - * This signals that the parent {@link com.datatorrent.api.Operator}'s - * {@link com.datatorrent.api.Operator#beginWindow(long)} method has been called. - * @param windowId The next windowId of the parent operator. - */ - void beginWindow(long windowId); - - /** - * This signals that the parent {@link com.datatorrent.api.Operator}'s - * {@link com.datatorrent.api.Operator#endWindow()} method has been called. - */ - void endWindow(); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java new file mode 100644 index 0000000..5d46906 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice; +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}. + * @param <T> The type of object stored in the {@link SpillableArrayListImpl}. + */ +@DefaultSerializer(FieldSerializer.class) [email protected] +public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T>, Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + + private int batchSize = DEFAULT_BATCH_SIZE; + private long bucketId; + private byte[] prefix; + + @NotNull + private SpillableStateStore store; + @NotNull + private Serde<T, Slice> serde; + @NotNull + private SpillableByteMapImpl<Integer, List<T>> map; + + private boolean sizeCached = false; + private int size; + private int numBatches; + + private SpillableArrayListImpl() + { + //for kryo + } + + public SpillableStateStore getStore() + { + return store; + } + + /** + * Creates a {@link SpillableArrayListImpl}. + * @param bucketId The Id of the bucket used to store this + * {@link SpillableArrayListImpl} in the provided {@link SpillableStateStore}. + * @param prefix The Id of this {@link SpillableArrayListImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param serde The {@link Serde} to use when serializing and deserializing data. + */ + public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix, + @NotNull SpillableStateStore store, + @NotNull Serde<T, Slice> serde) + { + this.bucketId = bucketId; + this.prefix = Preconditions.checkNotNull(prefix); + this.store = Preconditions.checkNotNull(store); + this.serde = Preconditions.checkNotNull(serde); + + map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), new SerdeListSlice(serde)); + } + + /** + * Creates a {@link SpillableArrayListImpl}. + * @param bucketId The Id of the bucket used to store this + * {@link SpillableArrayListImpl} in the provided {@link SpillableStateStore}. + * @param prefix The Id of this {@link SpillableArrayListImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param serde The {@link Serde} to use when serializing and deserializing data. + * @param batchSize When spilled to a {@link SpillableStateStore} data is stored in a batch. This determines the + * number of elements a batch will contain when it's spilled. Having small batches will increase + * the number of keys stored by your {@link SpillableStateStore} but will improve random reads and + * writes. Increasing the batch size will improve sequential read and write speed. + */ + public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix, + @NotNull SpillableStateStore store, + @NotNull Serde<T, Slice> serde, + int batchSize) + { + this(bucketId, prefix, store, serde); + + Preconditions.checkArgument(this.batchSize > 0); + this.batchSize = batchSize; + } + + public void setSize(int size) + { + Preconditions.checkArgument(size >= 0); + this.size = size; + } + + @Override + public int size() + { + return size; + } + + @Override + public boolean isEmpty() + { + return size == 0; + } + + @Override + public boolean contains(Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public Iterator<T> iterator() + { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() + { + throw new UnsupportedOperationException(); + } + + @Override + public <T1> T1[] toArray(T1[] t1s) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(T t) + { + Preconditions.checkArgument((size() + 1) > 0); + + int batchIndex = (size / batchSize); + + List<T> batch = null; + + if (batchIndex == numBatches) { + batch = Lists.newArrayListWithCapacity(batchSize); + numBatches++; + } else { + batch = map.get(batchIndex); + } + + batch.add(t); + + size++; + map.put(batchIndex, batch); + return true; + } + + @Override + public boolean remove(Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection<?> collection) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection<? extends T> collection) + { + for (T element: collection) { + add(element); + } + + return true; + } + + @Override + public boolean addAll(int i, Collection<? extends T> collection) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection<?> collection) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection<?> collection) + { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + throw new UnsupportedOperationException(); + } + + @Override + public T get(int i) + { + if (!(i < size)) { + throw new IndexOutOfBoundsException(); + } + + int batchIndex = i / batchSize; + int batchOffset = i % batchSize; + + List<T> batch = map.get(batchIndex); + return batch.get(batchOffset); + } + + @Override + public T set(int i, T t) + { + if (!(i < size)) { + throw new IndexOutOfBoundsException(); + } + + int batchIndex = i / batchSize; + int batchOffset = i % batchSize; + + List<T> batch = map.get(batchIndex); + T old = batch.get(batchOffset); + batch.set(batchOffset, t); + map.put(batchIndex, batch); + return old; + } + + @Override + public void add(int i, T t) + { + throw new UnsupportedOperationException(); + } + + @Override + public T remove(int i) + { + throw new UnsupportedOperationException(); + } + + @Override + public int indexOf(Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public int lastIndexOf(Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public ListIterator<T> listIterator() + { + throw new UnsupportedOperationException(); + } + + @Override + public ListIterator<T> listIterator(int i) + { + throw new UnsupportedOperationException(); + } + + @Override + public List<T> subList(int i, int i1) + { + throw new UnsupportedOperationException(); + } + + @Override + public void setup(Context.OperatorContext context) + { + map.setup(context); + } + + @Override + public void beginWindow(long windowId) + { + map.beginWindow(windowId); + } + + @Override + public void endWindow() + { + map.endWindow(); + } + + @Override + public void teardown() + { + map.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java new file mode 100644 index 0000000..ba0bb77 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}. + */ +@DefaultSerializer(FieldSerializer.class) [email protected] +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>, + Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>(); + private transient boolean isRunning = false; + private transient boolean isInWindow = false; + + private int batchSize = DEFAULT_BATCH_SIZE; + @NotNull + private SpillableByteMapImpl<byte[], Integer> map; + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde<K, Slice> serdeKey; + private Serde<V, Slice> serdeValue; + + private SpillableByteArrayListMultimapImpl() + { + // for kryo + } + + /** + * Creates a {@link SpillableByteArrayListMultimapImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param identifier The Id of this {@link SpillableByteArrayListMultimapImpl}. + * @param bucket The Id of the bucket used to store this + * {@link SpillableByteArrayListMultimapImpl} in the provided {@link SpillableStateStore}. + * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. + * @param serdeKey The {@link Serde} to use when serializing and deserializing values. + */ + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { + this.store = Preconditions.checkNotNull(store); + this.identifier = Preconditions.checkNotNull(identifier); + this.bucket = bucket; + this.serdeKey = Preconditions.checkNotNull(serdeKey); + this.serdeValue = Preconditions.checkNotNull(serdeValue); + + map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + public SpillableStateStore getStore() + { + return store; + } + + @Override + public List<V> get(@Nullable K key) + { + return getHelper(key); + } + + private SpillableArrayListImpl<V> getHelper(@Nullable K key) + { + SpillableArrayListImpl<V> spillableArrayList = cache.get(key); + + if (spillableArrayList == null) { + Slice keySlice = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray()); + + if (size == null) { + return null; + } + + Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice); + spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue); + spillableArrayList.setSize(size); + } + + cache.put(key, spillableArrayList); + + return spillableArrayList; + } + + @Override + public Set<K> keySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public Multiset<K> keys() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<V> values() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<Map.Entry<K, V>> entries() + { + throw new UnsupportedOperationException(); + } + + @Override + public List<V> removeAll(@Nullable Object key) + { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + return map.size(); + } + + @Override + public boolean isEmpty() + { + return map.isEmpty(); + } + + @Override + public boolean containsKey(@Nullable Object key) + { + return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), + SIZE_KEY_SUFFIX).toByteArray()); + } + + @Override + public boolean containsValue(@Nullable Object value) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsEntry(@Nullable Object key, @Nullable Object value) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean put(@Nullable K key, @Nullable V value) + { + SpillableArrayListImpl<V> spillableArrayList = getHelper(key); + + if (spillableArrayList == null) { + Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key)); + spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue); + + cache.put(key, spillableArrayList); + } + + spillableArrayList.add(value); + return true; + } + + @Override + public boolean remove(@Nullable Object key, @Nullable Object value) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean putAll(@Nullable K key, Iterable<? extends V> values) + { + boolean changed = false; + + for (V value: values) { + changed |= put(key, value); + } + + return changed; + } + + @Override + public boolean putAll(Multimap<? extends K, ? extends V> multimap) + { + boolean changed = false; + + for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) { + changed |= put(entry.getKey(), entry.getValue()); + } + + return changed; + } + + @Override + public List<V> replaceValues(K key, Iterable<? extends V> values) + { + throw new UnsupportedOperationException(); + } + + @Override + public Map<K, Collection<V>> asMap() + { + throw new UnsupportedOperationException(); + } + + @Override + public void setup(Context.OperatorContext context) + { + map.setup(context); + isRunning = true; + } + + @Override + public void beginWindow(long windowId) + { + map.beginWindow(windowId); + isInWindow = true; + } + + @Override + public void endWindow() + { + isInWindow = false; + for (K key: cache.getChangedKeys()) { + + SpillableArrayListImpl<V> spillableArrayList = cache.get(key); + spillableArrayList.endWindow(); + + Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX).toByteArray(), + spillableArrayList.size()); + } + + Preconditions.checkState(cache.getRemovedKeys().isEmpty()); + cache.endWindow(); + map.endWindow(); + } + + @Override + public void teardown() + { + isRunning = false; + map.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java new file mode 100644 index 0000000..da313ee --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * A Spillable implementation of {@link Map} + * @param <K> The types of keys. + * @param <V> The types of values. + */ +@DefaultSerializer(FieldSerializer.class) [email protected] +public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent, + Serializable +{ + private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>(); + private transient MutableInt tempOffset = new MutableInt(); + + @NotNull + private SpillableStateStore store; + @NotNull + private byte[] identifier; + private long bucket; + @NotNull + private Serde<K, Slice> serdeKey; + @NotNull + private Serde<V, Slice> serdeValue; + + private int size = 0; + + private SpillableByteMapImpl() + { + //for kryo + } + + /** + * Creats a {@link SpillableByteMapImpl}. + * @param store The {@link SpillableStateStore} in which to spill to. + * @param identifier The Id of this {@link SpillableByteMapImpl}. + * @param bucket The Id of the bucket used to store this + * {@link SpillableByteMapImpl} in the provided {@link SpillableStateStore}. + * @param serdeKey The {@link Serde} to use when serializing and deserializing keys. + * @param serdeKey The {@link Serde} to use when serializing and deserializing values. + */ + public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { + this.store = Preconditions.checkNotNull(store); + this.identifier = Preconditions.checkNotNull(identifier); + this.bucket = bucket; + this.serdeKey = Preconditions.checkNotNull(serdeKey); + this.serdeValue = Preconditions.checkNotNull(serdeValue); + } + + public SpillableStateStore getStore() + { + return this.store; + } + + @Override + public int size() + { + return size; + } + + @Override + public boolean isEmpty() + { + return size == 0; + } + + @Override + public boolean containsKey(Object o) + { + return get(o) != null; + } + + @Override + public boolean containsValue(Object o) + { + throw new UnsupportedOperationException(); + } + + @Override + public V get(Object o) + { + K key = (K)o; + + if (cache.getRemovedKeys().contains(key)) { + return null; + } + + V val = cache.get(key); + + if (val != null) { + return val; + } + + Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key))); + + if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) { + return null; + } + + tempOffset.setValue(0); + return serdeValue.deserialize(valSlice, tempOffset); + } + + @Override + public V put(K k, V v) + { + V value = get(k); + + if (value == null) { + size++; + } + + cache.put(k, v); + + return value; + } + + @Override + public V remove(Object o) + { + V value = get(o); + + if (value != null) { + size--; + } + + cache.remove((K)o); + + return value; + } + + @Override + public void putAll(Map<? extends K, ? extends V> map) + { + for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); + } + } + + @Override + public void clear() + { + throw new UnsupportedOperationException(); + } + + @Override + public Set<K> keySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<V> values() + { + throw new UnsupportedOperationException(); + } + + @Override + public Set<Entry<K, V>> entrySet() + { + throw new UnsupportedOperationException(); + } + + @Override + public void setup(Context.OperatorContext context) + { + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + for (K key: cache.getChangedKeys()) { + store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)), + serdeValue.serialize(cache.get(key))); + } + + for (K key: cache.getRemovedKeys()) { + store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)), + new Slice(ArrayUtils.EMPTY_BYTE_ARRAY)); + } + + cache.endWindow(); + } + + @Override + public void teardown() + { + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java index 29da3f5..c63c7ef 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java @@ -23,6 +23,7 @@ import org.apache.apex.malhar.lib.utils.serde.Serde; import com.datatorrent.api.Component; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Operator; import com.datatorrent.netlet.util.Slice; /** @@ -31,7 +32,8 @@ import com.datatorrent.netlet.util.Slice; * * @since 3.4.0 */ -public interface SpillableComplexComponent extends Component<OperatorContext>, SpillableComponent +public interface SpillableComplexComponent extends Component<OperatorContext>, SpillableComponent, + Operator.CheckpointNotificationListener { /** * This is a method for creating a {@link SpillableArrayList}. This method http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java new file mode 100644 index 0000000..b31adfd --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.List; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * This is a factory that is used for Spillable datastructures. This component is used by nesting it inside of an + * operator and forwarding the appropriate operator callbacks are called on the {@link SpillableComplexComponentImpl}. + * Spillable datastructures are created by called the appropriate factory methods on the + * {@link SpillableComplexComponentImpl} in the setup method of an operator. + */ [email protected] +public class SpillableComplexComponentImpl implements SpillableComplexComponent +{ + private List<SpillableComponent> componentList = Lists.newArrayList(); + + @NotNull + private SpillableStateStore store; + + @NotNull + private SpillableIdentifierGenerator identifierGenerator; + + private SpillableComplexComponentImpl() + { + // for kryo + } + + public SpillableComplexComponentImpl(SpillableStateStore store) + { + this(store, new SequentialSpillableIdentifierGenerator()); + } + + public SpillableComplexComponentImpl(SpillableStateStore store, SpillableIdentifierGenerator identifierGenerator) + { + this.store = Preconditions.checkNotNull(store); + this.identifierGenerator = Preconditions.checkNotNull(identifierGenerator); + } + + public <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde) + { + SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde); + componentList.add(list); + return list; + } + + public <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde) + { + identifierGenerator.register(identifier); + SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde); + componentList.add(list); + return list; + } + + public <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { + SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifierGenerator.next(), + bucket, serdeKey, serdeValue); + componentList.add(map); + return map; + } + + public <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { + identifierGenerator.register(identifier); + SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue); + componentList.add(map); + return map; + } + + public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket, Serde<K, + Slice> serdeKey, Serde<V, Slice> serdeValue) + { + SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store, + identifierGenerator.next(), bucket, serdeKey, serdeValue); + componentList.add(map); + return map; + } + + public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier, long bucket, + Serde<K, Slice> serdeKey, + Serde<V, Slice> serdeValue) + { + identifierGenerator.register(identifier); + SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store, + identifier, bucket, serdeKey, serdeValue); + componentList.add(map); + return map; + } + + public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde) + { + throw new UnsupportedOperationException("Unsupported Operation"); + } + + public <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde) + { + throw new UnsupportedOperationException("Unsupported Operation"); + } + + public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde) + { + throw new UnsupportedOperationException("Unsupported Operation"); + } + + public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde) + { + throw new UnsupportedOperationException("Unsupported Operation"); + } + + @Override + public void setup(Context.OperatorContext context) + { + store.setup(context); + for (SpillableComponent spillableComponent: componentList) { + spillableComponent.setup(context); + } + } + + @Override + public void beginWindow(long windowId) + { + store.beginWindow(windowId); + for (SpillableComponent spillableComponent: componentList) { + spillableComponent.beginWindow(windowId); + } + } + + @Override + public void endWindow() + { + for (SpillableComponent spillableComponent: componentList) { + spillableComponent.endWindow(); + } + store.endWindow(); + } + + @Override + public void teardown() + { + for (SpillableComponent spillableComponent: componentList) { + spillableComponent.teardown(); + } + store.teardown(); + } + + @Override + public void beforeCheckpoint(long l) + { + store.beforeCheckpoint(l); + } + + @Override + public void checkpointed(long l) + { + store.checkpointed(l); + } + + @Override + public void committed(long l) + { + store.committed(l); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java new file mode 100644 index 0000000..17a52f0 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Classes implementing this interface can be used as generators for identifiers for Spillable data structures. This is + * mainly used in implementations of {@link SpillableComplexComponent}. + */ [email protected] +public interface SpillableIdentifierGenerator +{ + /** + * Generators the next valid identifier for a Spillable data structure. + * @return A byte array which represents the next valid identifier for a Spillable data structure. + */ + byte[] next(); + + /** + * Registers the given identifier with this {@link SpillableIdentifierGenerator}. + * @param identifier The identifier to register with this {@link SpillableIdentifierGenerator}. + */ + void register(byte[] identifier); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java new file mode 100644 index 0000000..1db0eeb --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.Operator; + +/** + * Implementations of this interface are used by Spillable datastructures to spill data to disk. + */ [email protected] +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>, + Operator.CheckpointNotificationListener, WindowListener +{ +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java new file mode 100644 index 0000000..025c501 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * A simple priority queue where the priority of an object is determined by the time at which it is inserted into the + * queue. The object in the queue with the smallest time stamp is the first to be dequeued. + * @param <T> The type of the objects inserted into the queue. + */ [email protected] +public class TimeBasedPriorityQueue<T> +{ + private Map<T, TimeWrapper<T>> timeWrappperMap = Maps.newHashMap(); + private Set<TimeWrapper<T>> sortedTimestamp = Sets.newTreeSet(); + + public void upSert(T value) + { + TimeWrapper<T> timeWrapper = timeWrappperMap.get(value); + + if (timeWrapper != null) { + sortedTimestamp.remove(timeWrapper); + timeWrapper.setTimestamp(System.currentTimeMillis()); + } else { + timeWrapper = new TimeWrapper<>(value, System.currentTimeMillis()); + timeWrappperMap.put(value, timeWrapper); + } + + sortedTimestamp.add(timeWrapper); + } + + public void remove(T value) + { + TimeWrapper<T> timeWrapper = timeWrappperMap.get(value); + sortedTimestamp.remove(timeWrapper); + timeWrappperMap.remove(value); + } + + public Set<T> removeLRU(int count) + { + Preconditions.checkArgument(count > 0 && count <= timeWrappperMap.size()); + + Iterator<TimeWrapper<T>> iterator = sortedTimestamp.iterator(); + Set<T> valueSet = Sets.newHashSet(); + + for (int counter = 0; counter < count; counter++) { + T value = iterator.next().getKey(); + valueSet.add(value); + timeWrappperMap.remove(value); + iterator.remove(); + } + + return valueSet; + } + + protected static class TimeWrapper<T> implements Comparable<TimeWrapper<T>> + { + private T key; + private long timestamp; + + public TimeWrapper(T key, long timestamp) + { + this.key = Preconditions.checkNotNull(key); + this.timestamp = timestamp; + } + + public T getKey() + { + return key; + } + + public long getTimestamp() + { + return timestamp; + } + + public void setTimestamp(long timestamp) + { + this.timestamp = timestamp; + } + + @Override + public int compareTo(TimeWrapper<T> timeWrapper) + { + if (this.timestamp < timeWrapper.getTimestamp()) { + return -1; + } else if (this.timestamp > timeWrapper.getTimestamp()) { + return 1; + } + + return 0; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + TimeWrapper<?> that = (TimeWrapper<?>)o; + + return key.equals(that.key); + } + + @Override + public int hashCode() + { + return key.hashCode(); + } + + @Override + public String toString() + { + return "TimeWrapper{" + + "key=" + key + + ", timestamp=" + timestamp + + '}'; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(TimeBasedPriorityQueue.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java new file mode 100644 index 0000000..fcf219d --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +/** + * This is an LRU cache with a maximum size. When the cache size is exceeded, the excess elements are kept in the + * cache until the end of the window. When the end of the window is reached, the least recently used entries are + * evicted from the cache. + * @param <K> The type of the keys. + * @param <V> The type of the values. + */ [email protected] +public class WindowBoundedMapCache<K, V> +{ + public static final int DEFAULT_MAX_SIZE = 50000; + + private int maxSize = DEFAULT_MAX_SIZE; + + private Map<K, V> cache = Maps.newHashMap(); + + private Set<K> changedKeys = Sets.newHashSet(); + private Set<K> removedKeys = Sets.newHashSet(); + private TimeBasedPriorityQueue<K> priorityQueue = new TimeBasedPriorityQueue<>(); + + public WindowBoundedMapCache() + { + } + + public WindowBoundedMapCache(int maxSize) + { + Preconditions.checkArgument(maxSize > 0); + + this.maxSize = maxSize; + } + + public void put(K key, V value) + { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(value); + + removedKeys.remove(key); + changedKeys.add(key); + priorityQueue.upSert(key); + + cache.put(key, value); + } + + public V get(K key) + { + Preconditions.checkNotNull(key); + + return cache.get(key); + } + + public boolean contains(K key) + { + return cache.containsKey(key); + } + + public void remove(K key) + { + Preconditions.checkNotNull(key); + + if (!cache.containsKey(key)) { + return; + } + + cache.remove(key); + changedKeys.remove(key); + removedKeys.add(key); + priorityQueue.remove(key); + } + + public Set<K> getChangedKeys() + { + return changedKeys; + } + + public Set<K> getRemovedKeys() + { + return removedKeys; + } + + /* + Note: beginWindow is intentionally not implemented because many users need a cache that does not require + beginWindow to be called. + */ + + public void endWindow() + { + int count = cache.size() - maxSize; + + if (count > 0) { + Set<K> expiredKeys = priorityQueue.removeLRU(count); + + for (K expiredKey: expiredKeys) { + cache.remove(expiredKey); + } + } + + changedKeys = Sets.newHashSet(); + removedKeys = Sets.newHashSet(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java new file mode 100644 index 0000000..fa8cd9f --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.Operator; + +/** + * This interface represents components which need to listen to the operator {@link Operator#beginWindow(long)} and + * {@link Operator#endWindow()} callbacks. + */ [email protected] +public interface WindowListener +{ + /** + * This is called when the parent {@link Operator}'s {@link Operator#beginWindow(long)} callback is called. + * @param windowId The id of the current application window. + */ + void beginWindow(long windowId); + + /** + * This is called when the parent {@link Operator}'s {@link Operator#endWindow()} callback is called. + */ + void endWindow(); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java deleted file mode 100644 index fa7bf08..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.state.spillable.inmem; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Set; - -import javax.annotation.Nullable; - -import org.apache.apex.malhar.lib.state.spillable.Spillable; - -import com.esotericsoftware.kryo.serializers.FieldSerializer; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.collect.HashMultiset; - -/** - * An in memory implementation of the {@link Spillable.SpillableByteMultiset} interface. - * @param <T> The type of the data stored in the {@link InMemMultiset} - */ -public class InMemMultiset<T> implements Spillable.SpillableByteMultiset<T> -{ - @FieldSerializer.Bind(JavaSerializer.class) - private HashMultiset<T> multiset = HashMultiset.create(); - - @Override - public int count(@Nullable Object element) - { - return multiset.count(element); - } - - @Override - public int add(@Nullable T element, int occurrences) - { - return multiset.add(element, occurrences); - } - - @Override - public int remove(@Nullable Object element, int occurrences) - { - return multiset.remove(element, occurrences); - } - - @Override - public int setCount(T element, int count) - { - return multiset.setCount(element, count); - } - - @Override - public boolean setCount(T element, int oldCount, int newCount) - { - return multiset.setCount(element, oldCount, newCount); - } - - @Override - public Set<T> elementSet() - { - return multiset.elementSet(); - } - - @Override - public Set<Entry<T>> entrySet() - { - return multiset.entrySet(); - } - - @Override - public Iterator<T> iterator() - { - return multiset.iterator(); - } - - @Override - public Object[] toArray() - { - return multiset.toArray(); - } - - @Override - public <T1> T1[] toArray(T1[] t1s) - { - return multiset.toArray(t1s); - } - - @Override - public int size() - { - return multiset.size(); - } - - @Override - public boolean isEmpty() - { - return multiset.isEmpty(); - } - - @Override - public boolean contains(@Nullable Object element) - { - return multiset.contains(element); - } - - @Override - public boolean containsAll(Collection<?> es) - { - return multiset.containsAll(es); - } - - @Override - public boolean addAll(Collection<? extends T> collection) - { - return multiset.addAll(collection); - } - - @Override - public boolean add(T element) - { - return multiset.add(element); - } - - @Override - public boolean remove(@Nullable Object element) - { - return multiset.remove(element); - } - - @Override - public boolean removeAll(Collection<?> c) - { - return multiset.removeAll(c); - } - - @Override - public boolean retainAll(Collection<?> c) - { - return multiset.retainAll(c); - } - - @Override - public void clear() - { - multiset.clear(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java deleted file mode 100644 index 9742537..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.state.spillable.inmem; - -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.ListIterator; - -import org.apache.apex.malhar.lib.state.spillable.Spillable; - -import com.google.common.collect.Lists; - -/** - * An in memory implementation of the {@link Spillable.SpillableArrayList} interface. - * @param <T> The type of the data stored in the {@link InMemSpillableArrayList} - */ -public class InMemSpillableArrayList<T> implements Spillable.SpillableArrayList<T> -{ - private List<T> list = Lists.newArrayList(); - - @Override - public int size() - { - return list.size(); - } - - @Override - public boolean isEmpty() - { - return list.isEmpty(); - } - - @Override - public boolean contains(Object o) - { - return list.contains(o); - } - - @Override - public Iterator<T> iterator() - { - return list.iterator(); - } - - @Override - public Object[] toArray() - { - return list.toArray(); - } - - @Override - public <T1> T1[] toArray(T1[] t1s) - { - return list.toArray(t1s); - } - - @Override - public boolean add(T t) - { - return list.add(t); - } - - @Override - public boolean remove(Object o) - { - return list.remove(o); - } - - @Override - public boolean containsAll(Collection<?> collection) - { - return list.containsAll(collection); - } - - @Override - public boolean addAll(Collection<? extends T> collection) - { - return list.addAll(collection); - } - - @Override - public boolean addAll(int i, Collection<? extends T> collection) - { - return list.addAll(i, collection); - } - - @Override - public boolean removeAll(Collection<?> collection) - { - return list.removeAll(collection); - } - - @Override - public boolean retainAll(Collection<?> collection) - { - return list.retainAll(collection); - } - - @Override - public void clear() - { - list.clear(); - } - - @Override - public T get(int i) - { - return list.get(i); - } - - @Override - public T set(int i, T t) - { - return list.set(i, t); - } - - @Override - public void add(int i, T t) - { - list.add(i, t); - } - - @Override - public T remove(int i) - { - return list.remove(i); - } - - @Override - public int indexOf(Object o) - { - return list.indexOf(o); - } - - @Override - public int lastIndexOf(Object o) - { - return list.lastIndexOf(o); - } - - @Override - public ListIterator<T> listIterator() - { - return list.listIterator(); - } - - @Override - public ListIterator<T> listIterator(int i) - { - return list.listIterator(i); - } - - @Override - public List<T> subList(int i, int i1) - { - return list.subList(i, i1); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java deleted file mode 100644 index 8376bd5..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.state.spillable.inmem; - -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.annotation.Nullable; - -import org.apache.apex.malhar.lib.state.spillable.Spillable; - -import com.esotericsoftware.kryo.serializers.FieldSerializer; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multiset; - -/** - * An in memory implementation of the {@link Spillable.SpillableByteArrayListMultimap} interface. - * @param <K> The type of the keys stored in the {@link InMemSpillableByteArrayListMultimap} - * @param <V> The type of the values stored in the {@link InMemSpillableByteArrayListMultimap} - */ -public class InMemSpillableByteArrayListMultimap<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V> -{ - @FieldSerializer.Bind(JavaSerializer.class) - private ListMultimap<K, V> multimap = ArrayListMultimap.create(); - - @Override - public List<V> get(@Nullable K key) - { - return multimap.get(key); - } - - @Override - public Set<K> keySet() - { - return multimap.keySet(); - } - - @Override - public Multiset<K> keys() - { - return multimap.keys(); - } - - @Override - public Collection<V> values() - { - return multimap.values(); - } - - @Override - public Collection<Map.Entry<K, V>> entries() - { - return multimap.entries(); - } - - @Override - public List<V> removeAll(@Nullable Object key) - { - return multimap.removeAll(key); - } - - @Override - public void clear() - { - multimap.clear(); - } - - @Override - public int size() - { - return multimap.size(); - } - - @Override - public boolean isEmpty() - { - return multimap.isEmpty(); - } - - @Override - public boolean containsKey(@Nullable Object key) - { - return multimap.containsKey(key); - } - - @Override - public boolean containsValue(@Nullable Object value) - { - return multimap.containsValue(value); - } - - @Override - public boolean containsEntry(@Nullable Object key, @Nullable Object value) - { - return multimap.containsEntry(key, value); - } - - @Override - public boolean put(@Nullable K key, @Nullable V value) - { - return multimap.put(key, value); - } - - @Override - public boolean remove(@Nullable Object key, @Nullable Object value) - { - return multimap.remove(key, value); - } - - @Override - public boolean putAll(@Nullable K key, Iterable<? extends V> values) - { - return multimap.putAll(key, values); - } - - @Override - public boolean putAll(Multimap<? extends K, ? extends V> m) - { - return multimap.putAll(m); - } - - @Override - public List<V> replaceValues(K key, Iterable<? extends V> values) - { - return multimap.replaceValues(key, values); - } - - @Override - public Map<K, Collection<V>> asMap() - { - return multimap.asMap(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java deleted file mode 100644 index 25e8b2c..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.state.spillable.inmem; - -import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent; -import org.apache.apex.malhar.lib.utils.serde.Serde; - -import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.Slice; - -/** - * An in memory implementation {@link SpillableComplexComponent} - */ -public class InMemSpillableComplexComponent implements SpillableComplexComponent -{ - @Override - public <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde) - { - return new InMemSpillableArrayList<>(); - } - - @Override - public <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, - Serde<T, Slice> serde) - { - return new InMemSpillableArrayList<>(); - } - - @Override - public <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey, - Serde<V, Slice> serdeValue) - { - throw new UnsupportedOperationException(); - } - - @Override - public <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket, - Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue) - { - throw new UnsupportedOperationException(); - } - - @Override - public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket, - Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue) - { - return new InMemSpillableByteArrayListMultimap<>(); - } - - @Override - public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier, - long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue) - { - return new InMemSpillableByteArrayListMultimap<>(); - } - - @Override - public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde) - { - return new InMemMultiset<>(); - } - - @Override - public <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket, - Serde<T, Slice> serde) - { - return new InMemMultiset<>(); - } - - @Override - public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde) - { - throw new UnsupportedOperationException(); - } - - @Override - public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde) - { - throw new UnsupportedOperationException(); - } - - @Override - public void setup(Context.OperatorContext context) - { - } - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - } - - @Override - public void teardown() - { - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java new file mode 100644 index 0000000..0e65344 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable.inmem; + +import java.util.Map; +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * A simple in memory implementation of a {@link SpillableStateStore} backed by a {@link Map}. + */ [email protected] +public class InMemSpillableStateStore implements SpillableStateStore +{ + private Map<Long, Map<Slice, Slice>> store = Maps.newHashMap(); + + @Override + public void setup(Context.OperatorContext context) + { + + } + + @Override + public void beginWindow(long windowId) + { + + } + + @Override + public void endWindow() + { + + } + + @Override + public void teardown() + { + + } + + @Override + public void put(long bucketId, @NotNull Slice key, @NotNull Slice value) + { + Map<Slice, Slice> bucket = store.get(bucketId); + + if (bucket == null) { + bucket = Maps.newHashMap(); + store.put(bucketId, bucket); + } + + bucket.put(key, value); + } + + @Override + public Slice getSync(long bucketId, @NotNull Slice key) + { + Map<Slice, Slice> bucket = store.get(bucketId); + + if (bucket == null) { + bucket = Maps.newHashMap(); + store.put(bucketId, bucket); + } + + return bucket.get(key); + } + + @Override + public Future<Slice> getAsync(long bucketId, @NotNull Slice key) + { + throw new UnsupportedOperationException(); + } + + @Override + public void beforeCheckpoint(long l) + { + } + + @Override + public void checkpointed(long l) + { + } + + @Override + public void committed(long l) + { + } + + @Override + public String toString() + { + return store.toString(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java new file mode 100644 index 0000000..6d68acc --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.spillable.managed; + +import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl; +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore; + +import com.esotericsoftware.kryo.DefaultSerializer; +import com.esotericsoftware.kryo.serializers.FieldSerializer; + +@DefaultSerializer(FieldSerializer.class) +public class ManagedStateSpillableStateStore extends ManagedStateImpl implements SpillableStateStore +{ + public ManagedStateSpillableStateStore() + { + super(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java index 85c34d9..9669981 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java @@ -19,6 +19,7 @@ package org.apache.apex.malhar.lib.utils.serde; import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.classification.InterfaceStability; /** * This is a simple pass through {@link Serde}. When serialization is performed the input byte array is returned. @@ -26,6 +27,7 @@ import org.apache.commons.lang3.mutable.MutableInt; * * @since 3.4.0 */ [email protected] public class PassThruByteArraySerde implements Serde<byte[], byte[]> { @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java new file mode 100644 index 0000000..436e7f8 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.commons.lang3.mutable.MutableInt; + +import com.datatorrent.netlet.util.Slice; + +/** + * This is a simple {@link Serde} which serializes and deserializes byte arrays to {@link Slice}s. A byte array is + * serialized by simply wrapping it in a {@link Slice} object and deserialized by simply reading the byte array + * out of the {@link Slice} object. + * + * <b>Note:</b> The deserialized method doesn't use the offset argument in this implementation. + */ +public class PassThruByteArraySliceSerde implements Serde<byte[], Slice> +{ + @Override + public Slice serialize(byte[] object) + { + return new Slice(object); + } + + @Override + public byte[] deserialize(Slice object, MutableInt offset) + { + offset.add(object.length); + + if (object.offset == 0) { + return object.buffer; + } + + byte[] bytes = new byte[object.length]; + System.arraycopy(object.buffer, object.offset, bytes, 0, object.length); + return bytes; + } + + @Override + public byte[] deserialize(Slice object) + { + return deserialize(object, new MutableInt(0)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java new file mode 100644 index 0000000..f9d93b3 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.netlet.util.Slice; + +/** + * This is a {@link Serde} implementation which simply allows an input slice to pass through. No serialization or + * deserialization transformation is performed on the input {@link Slice}s. + */ [email protected] +public class PassThruSliceSerde implements Serde<Slice, Slice> +{ + @Override + public Slice serialize(Slice object) + { + return object; + } + + @Override + public Slice deserialize(Slice object, MutableInt offset) + { + return object; + } + + @Override + public Slice deserialize(Slice object) + { + return object; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java new file mode 100644 index 0000000..c18af33 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.utils.serde; + +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.netlet.util.Slice; + +/** + * This is an implementation of {@link Serde} which deserializes and serializes integers. + */ [email protected] +public class SerdeIntSlice implements Serde<Integer, Slice> +{ + @Override + public Slice serialize(Integer object) + { + return new Slice(GPOUtils.serializeInt(object)); + } + + @Override + public Integer deserialize(Slice slice, MutableInt offset) + { + int val = GPOUtils.deserializeInt(slice.buffer, new MutableInt(slice.offset + offset.intValue())); + offset.add(4); + return val; + } + + @Override + public Integer deserialize(Slice object) + { + return deserialize(object, new MutableInt(0)); + } +}
