APEXMALHAR-1897 added managed state
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a8fbcac6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a8fbcac6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a8fbcac6 Branch: refs/heads/devel-3 Commit: a8fbcac6236e4130cef1e83830e944c4788bbca4 Parents: 5373a3c Author: Chandni Singh <[email protected]> Authored: Sun Dec 13 03:13:08 2015 -0800 Committer: Chandni Singh <[email protected]> Committed: Fri Mar 25 00:04:04 2016 -0700 ---------------------------------------------------------------------- .../datatorrent/lib/fileaccess/FileAccess.java | 3 + .../lib/fileaccess/FileAccessFSImpl.java | 11 +- .../apex/malhar/lib/state/BucketedState.java | 72 +++ .../lib/state/TimeSlicedBucketedState.java | 104 ++++ .../state/managed/AbstractManagedStateImpl.java | 583 +++++++++++++++++++ .../apex/malhar/lib/state/managed/Bucket.java | 525 +++++++++++++++++ .../lib/state/managed/BucketsFileSystem.java | 566 ++++++++++++++++++ .../managed/IncrementalCheckpointManager.java | 213 +++++++ .../malhar/lib/state/managed/ManagedState.java | 32 + .../state/managed/ManagedStateComponent.java | 36 ++ .../lib/state/managed/ManagedStateContext.java | 38 ++ .../lib/state/managed/ManagedStateImpl.java | 103 ++++ .../lib/state/managed/ManagedTimeStateImpl.java | 103 ++++ .../managed/ManagedTimeUnifiedStateImpl.java | 213 +++++++ .../malhar/lib/state/managed/StateTracker.java | 194 ++++++ .../lib/state/managed/TimeBucketAssigner.java | 242 ++++++++ .../malhar/lib/state/managed/package-info.java | 22 + .../apex/malhar/lib/state/package-info.java | 22 + .../state/managed/BucketsFileSystemTest.java | 166 ++++++ .../lib/state/managed/DefaultBucketTest.java | 203 +++++++ .../IncrementalCheckpointManagerTest.java | 196 +++++++ .../lib/state/managed/ManagedStateImplTest.java | 182 ++++++ .../state/managed/ManagedStateTestUtils.java | 141 +++++ .../state/managed/ManagedTimeStateImplTest.java | 151 +++++ .../ManagedTimeUnifiedStateImplTest.java | 149 +++++ .../state/managed/MockManagedStateContext.java | 91 +++ .../lib/state/managed/StateTrackerTest.java | 174 ++++++ .../state/managed/TimeBucketAssignerTest.java | 123 ++++ 28 files changed, 4654 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java index d4c7810..f8dd0be 100644 --- a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java @@ -52,8 +52,11 @@ public interface FileAccess extends Closeable * @throws IOException */ void rename(long bucketKey, String oldName, String newName) throws IOException; + void delete(long bucketKey, String fileName) throws IOException; + void deleteBucket(long bucketKey) throws IOException; + long getFileSize(long bucketKey, String s) throws IOException; /** http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java index 74ab238..a9cfe00 100644 --- a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java @@ -135,10 +135,13 @@ public abstract class FileAccessFSImpl implements FileAccess public RemoteIterator<LocatedFileStatus> listFiles(long bucketKey) throws IOException { Path bucketPath = getBucketPath(bucketKey); - if (!fs.exists(bucketPath)) { - return null; - } - return fs.listFiles(bucketPath, true); + return fs.exists(bucketPath) ? fs.listFiles(bucketPath, true) : null; + } + + @Override + public void deleteBucket(long bucketKey) throws IOException + { + fs.delete(getBucketPath(bucketKey), true); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java new file mode 100644 index 0000000..a270eb6 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.state; + +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.netlet.util.Slice; + +/** + * A state where keys are grouped in buckets. + */ +public interface BucketedState +{ + /** + * An expired value. In some implementations where bucketId is time related then the event can be old and + * the get methods- getSync & getAsync return this fixed slice instance.<br/> + * In the usages, comparisons with EXPIRED should be made using <code>==</code> instead of <code>equals</code>. + */ + Slice EXPIRED = new Slice(null, -1, -1); + + /** + * Sets the value of the key in bucket identified by bucketId. + * + * @param bucketId identifier of the bucket. + * @param key key (not null) + * @param value value (not null) + */ + void put(long bucketId, @NotNull Slice key, @NotNull Slice value); + + /** + * Returns the value of the key in a bucket identified by bucketId. Fetching a key can be expensive if the key + * is not in memory and is present on disk. This fetches the key synchronously. <br/> + * {@link #getAsync(long, Slice)} is recommended for efficiently reading the value of a key. + * + * @param bucketId identifier of the bucket. + * @param key key (not null) + * + * @return value of the key if found; null if the key is not found; + * {@link #EXPIRED} if the bucketId is time based and very old. + */ + Slice getSync(long bucketId, @NotNull Slice key); + + /** + * Returns the future using which the value is obtained.<br/> + * + * @param key key (not null) + * + * @return value of the key if found; null if the key is not found; + * {@link #EXPIRED} if the bucketId is time based and very old. + */ + Future<Slice> getAsync(long bucketId, @NotNull Slice key); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java new file mode 100644 index 0000000..55b92a3 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state; + +import java.util.concurrent.Future; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.netlet.util.Slice; + +/** + * A type of bucketed state where a bucket's data is further divided into time buckets. This requires + * time per key to figure out which time bucket a particular key belongs to. + * <p/> + * The time here is mainly used for purging of aged key/value pair. + */ +public interface TimeSlicedBucketedState +{ + /** + * Sets the value of a key in the bucket identified by bucketId. Time is used to derive which time bucket (within + * the main bucket) a key belongs to. + * + * @param bucketId identifier of the bucket. + * @param time time associated with the key. + * @param key key (not null) + * @param value value (not null) + */ + void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value); + + /** + * Returns the value of the key in the bucket identified by bucketId.<br/> + * If the value of the key is not present in the bucket cache then this scans all the time bucket files on disk from + * the latest to the oldest. + * + * It retrieves the value synchronously that can be expensive.<br/> + * {@link #getAsync(long, Slice)} is recommended for efficient reading the value of a key. + * + * + * @param bucketId identifier of the bucket + * @param key key (not null) + * + * @return value of the key if found; null if the key is not found; + */ + Slice getSync(long bucketId, @NotNull Slice key); + + + /** + * Returns the value of key in the bucket identified by bucketId.<br/> + * If the value of the key is not present in the bucket cache then this will use the time to derive the time + * bucket and just search for the key in a particular time bucket file.<br/> + * + * It retrieves the value synchronously which can be expensive.<br/> + * {@link #getAsync(long, long, Slice)} is recommended for efficiently reading the value of a key. + * + * @param bucketId identifier of the bucket. + * @param time time for deriving the time bucket. + * @param key key (not null) + * + * @return value of the key if found; null if the key is not found; {@link BucketedState#EXPIRED} if the time is old. + */ + Slice getSync(long bucketId, long time, @NotNull Slice key); + + /** + * Returns the future using which the value is obtained.<br/> + * If the value of the key is not present in the bucket cache then this searches for it in all the time buckets on + * disk.<br/> + * Time-buckets are looked-up in order from newest to oldest. + * + * @param bucketId identifier of the bucket. + * @param key key (not null) + * + * @return value of the key if found; null if the key is not found; + */ + Future<Slice> getAsync(long bucketId, @NotNull Slice key); + + /** + * Returns the future using which the value is obtained.<br/> + * If the value of the key is not present in the bucket cache then this will use the time to derive the time + * bucket and just search for the key in a particular time bucket file.<br/> + * + * @param bucketId identifier of the bucket. + * @param time time associated with the key. + * @param key key (not null) + * + * @return value of the key if found; null if the key is not found; {@link BucketedState#EXPIRED} if time is very old. + */ + Future<Slice> getAsync(long bucketId, long time, @NotNull Slice key); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java new file mode 100644 index 0000000..11db44d --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java @@ -0,0 +1,583 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimaps; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.NameableThreadFactory; +import com.datatorrent.lib.fileaccess.FileAccess; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.lib.util.comparator.SliceComparator; +import com.datatorrent.netlet.util.Slice; + +/** + * An abstract implementation of managed state.<br/> + * + * The important sub-components here are: + * <ol> + * <li> + * {@link #checkpointManager}: writes incremental checkpoints in window files and transfers data from window + * files to bucket files. + * </li> + * <li> + * {@link #bucketsFileSystem}: manages writing/reading from all the buckets. A bucket on disk is further sub-divided + * into time-buckets. This abstracts out updating time-buckets and meta files and reading from them. + * </li> + * <li> + * {@link #timeBucketAssigner}: assigns time-buckets to keys and manages the time boundaries. + * </li> + * <li> + * {@link #stateTracker}: tracks the size of data in memory and requests buckets to free memory when enough memory + * is not available. + * </li> + * <li> + * {@link #fileAccess}: plug-able file system abstraction. + * </li> + * </ol> + * <p/> + * <b>Differences between different concrete implementations of {@link AbstractManagedStateImpl}</b> + * <table> + * <tr> + * <td></td> + * <td>{@link ManagedStateImpl}</td> + * <td>{@link ManagedTimeStateImpl}</td> + * <td>{@link ManagedTimeUnifiedStateImpl}</td> + * </tr> + * <tr> + * <td>Main buckets</td> + * <td>identified by unique adhoc long ids that the user provides with the key.</td> + * <td>same as ManagedStateImpl.</td> + * <td>user doesn't provide bucket ids and instead just provides time. Time is used to derive the time buckets + * and these are the main buckets.</td> + * </tr> + * <tr> + * <td>Data on disk: data in buckets is persisted on disk with each bucket data further divided into + * time-buckets, i.e., {base_path}/{bucketId}/{time-bucket id}</td> + * <td>time-bucket is computed using the system time corresponding to the application window.</td> + * <td>time-bucket is derived from the user provided time.</td> + * <td>time-bucket is derived from the user provided time. + * In this implementation operator id is used to isolate data of different partitions on disk, i.e., + * {base_path}/{operatorId}/{time-bucket id}</td> + * </tr> + * <tr> + * <td>Bucket partitioning</td> + * <td>bucket belongs to just one partition. Multiple partitions cannot write to the same bucket.</td> + * <td>same as ManagedStateImpl.</td> + * <td>multiple partitions can be working with the same time-bucket since time-bucket is derived from time. + * This works because on disk each partition's data is segregated by the operator id.</td> + * </tr> + * <tr> + * <td>Dynamic partitioning</td> + * <td>can support dynamic partitioning by pre-allocating buckets.</td> + * <td>same as ManagedStateImpl.</td> + * <td>will not be able to support dynamic partitioning efficiently.</td> + * </tr> + * </table> + * + */ +public abstract class AbstractManagedStateImpl + implements ManagedState, Component<OperatorContext>, Operator.CheckpointNotificationListener, ManagedStateContext, + TimeBucketAssigner.PurgeListener +{ + private long maxMemorySize; + + protected int numBuckets; + + @NotNull + private FileAccess fileAccess = new TFileImpl.DTFileImpl(); + @NotNull + protected TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner(); + + protected Bucket[] buckets; + + @Min(1) + private int numReaders = 1; + @NotNull + protected transient ExecutorService readerService; + + @NotNull + protected IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager(); + + @NotNull + protected BucketsFileSystem bucketsFileSystem = new BucketsFileSystem(); + + protected transient OperatorContext operatorContext; + + @NotNull + protected Comparator<Slice> keyComparator = new SliceComparator(); + + protected final transient AtomicReference<Throwable> throwable = new AtomicReference<>(); + + @NotNull + @FieldSerializer.Bind(JavaSerializer.class) + private Duration checkStateSizeInterval = Duration.millis( + DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue * OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue); + + @FieldSerializer.Bind(JavaSerializer.class) + private Duration durationPreventingFreeingSpace; + + private transient StateTracker stateTracker = new StateTracker(); + + //accessible to StateTracker + final transient Object commitLock = new Object(); + + protected final transient ListMultimap<Long, ValueFetchTask> tasksPerBucketId = + Multimaps.synchronizedListMultimap(ArrayListMultimap.<Long, ValueFetchTask>create()); + + @Override + public void setup(OperatorContext context) + { + operatorContext = context; + fileAccess.init(); + + timeBucketAssigner.setPurgeListener(this); + + //setup all the managed state components + timeBucketAssigner.setup(this); + checkpointManager.setup(this); + bucketsFileSystem.setup(this); + + if (buckets == null) { + //create buckets array only once at start when it is not created. + numBuckets = getNumBuckets(); + buckets = new Bucket[numBuckets]; + } + for (Bucket bucket : buckets) { + if (bucket != null) { + bucket.setup(this); + } + } + + stateTracker.setup(this); + long activationWindow = context.getValue(OperatorContext.ACTIVATION_WINDOW_ID); + + if (activationWindow != Stateless.WINDOW_ID) { + //delete all the wal files with windows > activationWindow. + //All the wal files with windows <= activationWindow are loaded and kept separately as recovered data. + try { + for (long recoveredWindow : checkpointManager.getWindowIds(operatorContext.getId())) { + if (recoveredWindow <= activationWindow) { + @SuppressWarnings("unchecked") + Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = (Map<Long, Map<Slice, Bucket.BucketedValue>>) + checkpointManager.load(operatorContext.getId(), recoveredWindow); + if (recoveredData != null && !recoveredData.isEmpty()) { + for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : recoveredData.entrySet()) { + int bucketIdx = prepareBucket(entry.getKey()); + buckets[bucketIdx].recoveredData(recoveredWindow, entry.getValue()); + } + } + checkpointManager.save(recoveredData, operatorContext.getId(), recoveredWindow, + true /*skipWritingToWindowFile*/); + + } else { + checkpointManager.delete(operatorContext.getId(), recoveredWindow); + } + } + } catch (IOException e) { + throw new RuntimeException("recovering", e); + } + } + + readerService = Executors.newFixedThreadPool(numReaders, new NameableThreadFactory("managedStateReaders")); + } + + /** + * Gets the number of buckets which is required during setup to create the array of buckets.<br/> + * {@link ManagedTimeStateImpl} provides num of buckets which is injected using a property.<br/> + * {@link ManagedTimeUnifiedStateImpl} provides num of buckets which are calculated based on time settings. + * + * @return number of buckets. + */ + public abstract int getNumBuckets(); + + public void beginWindow(long windowId) + { + if (throwable.get() != null) { + Throwables.propagate(throwable.get()); + } + timeBucketAssigner.beginWindow(windowId); + } + + + /** + * Prepares the bucket and returns its index. + * @param bucketId bucket key + * @return bucket index + */ + protected int prepareBucket(long bucketId) + { + stateTracker.bucketAccessed(bucketId); + int bucketIdx = getBucketIdx(bucketId); + + Bucket bucket = buckets[bucketIdx]; + if (bucket == null) { + //bucket is not in memory + bucket = newBucket(bucketId); + bucket.setup(this); + buckets[bucketIdx] = bucket; + } else if (bucket.getBucketId() != bucketId) { + handleBucketConflict(bucketIdx, bucketId); + } + return bucketIdx; + } + + protected void putInBucket(long bucketId, long timeBucket, @NotNull Slice key, @NotNull Slice value) + { + Preconditions.checkNotNull(key, "key"); + Preconditions.checkNotNull(value, "value"); + if (timeBucket != -1) { + //time bucket is invalid data is not stored + int bucketIdx = prepareBucket(bucketId); + buckets[bucketIdx].put(key, timeBucket, value); + } + } + + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + protected Slice getValueFromBucketSync(long bucketId, long timeBucket, @NotNull Slice key) + { + Preconditions.checkNotNull(key, "key"); + int bucketIdx = prepareBucket(bucketId); + Bucket bucket = buckets[bucketIdx]; + synchronized (bucket) { + return bucket.get(key, timeBucket, Bucket.ReadSource.ALL); + } + } + + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + protected Future<Slice> getValueFromBucketAsync(long bucketId, long timeBucket, @NotNull Slice key) + { + Preconditions.checkNotNull(key, "key"); + int bucketIdx = prepareBucket(bucketId); + Bucket bucket = buckets[bucketIdx]; + synchronized (bucket) { + Slice cachedVal = bucket.get(key, timeBucket, Bucket.ReadSource.MEMORY); + if (cachedVal != null) { + return Futures.immediateFuture(cachedVal); + } + ValueFetchTask valueFetchTask = new ValueFetchTask(bucket, key, timeBucket, this); + tasksPerBucketId.put(bucket.getBucketId(), valueFetchTask); + return readerService.submit(valueFetchTask); + } + } + + protected void handleBucketConflict(int bucketIdx, long newBucketId) + { + throw new IllegalArgumentException("bucket conflict " + buckets[bucketIdx].getBucketId() + " " + newBucketId); + } + + protected int getBucketIdx(long bucketId) + { + return (int)(bucketId % numBuckets); + } + + Bucket getBucket(long bucketId) + { + return buckets[getBucketIdx(bucketId)]; + } + + protected Bucket newBucket(long bucketId) + { + return new Bucket.DefaultBucket(bucketId); + } + + public void endWindow() + { + timeBucketAssigner.endWindow(); + } + + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + @Override + public void beforeCheckpoint(long windowId) + { + Map<Long, Map<Slice, Bucket.BucketedValue>> flashData = Maps.newHashMap(); + + for (Bucket bucket : buckets) { + if (bucket != null) { + synchronized (bucket) { + Map<Slice, Bucket.BucketedValue> flashDataForBucket = bucket.checkpoint(windowId); + if (!flashDataForBucket.isEmpty()) { + flashData.put(bucket.getBucketId(), flashDataForBucket); + } + } + } + } + if (!flashData.isEmpty()) { + try { + checkpointManager.save(flashData, operatorContext.getId(), windowId, false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void checkpointed(long windowId) + { + } + + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + @Override + public void committed(long windowId) + { + synchronized (commitLock) { + try { + for (Bucket bucket : buckets) { + if (bucket != null) { + synchronized (bucket) { + bucket.committed(windowId); + } + } + } + checkpointManager.committed(operatorContext.getId(), windowId); + } catch (IOException | InterruptedException e) { + throw new RuntimeException("committing " + windowId, e); + } + } + } + + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + @Override + public void teardown() + { + checkpointManager.teardown(); + bucketsFileSystem.teardown(); + timeBucketAssigner.teardown(); + readerService.shutdownNow(); + for (Bucket bucket : buckets) { + if (bucket != null) { + synchronized (bucket) { + bucket.teardown(); + } + } + } + stateTracker.teardown(); + } + + @Override + public void purgeTimeBucketsLessThanEqualTo(long timeBucket) + { + checkpointManager.setLatestExpiredTimeBucket(timeBucket); + } + + @Override + public OperatorContext getOperatorContext() + { + return operatorContext; + } + + @Override + public void setMaxMemorySize(long bytes) + { + maxMemorySize = bytes; + } + + /** + * + * @return the optimal size of the cache that triggers eviction of committed data from memory. + */ + public long getMaxMemorySize() + { + return maxMemorySize; + } + + /** + * Sets the {@link FileAccess} implementation. + * @param fileAccess specific implementation of FileAccess. + */ + public void setFileAccess(@NotNull FileAccess fileAccess) + { + this.fileAccess = Preconditions.checkNotNull(fileAccess); + } + + @Override + public FileAccess getFileAccess() + { + return fileAccess; + } + + /** + * Sets the time bucket assigner. This can be used for plugging any custom time bucket assigner. + * + * @param timeBucketAssigner a {@link TimeBucketAssigner} + */ + public void setTimeBucketAssigner(@NotNull TimeBucketAssigner timeBucketAssigner) + { + this.timeBucketAssigner = Preconditions.checkNotNull(timeBucketAssigner); + } + + @Override + public TimeBucketAssigner getTimeBucketAssigner() + { + return timeBucketAssigner; + } + + @Override + public Comparator<Slice> getKeyComparator() + { + return keyComparator; + } + + /** + * Sets the key comparator. The keys on the disk in time bucket files are sorted. This sets the comparator for the + * key. + * @param keyComparator key comparator + */ + public void setKeyComparator(@NotNull Comparator<Slice> keyComparator) + { + this.keyComparator = Preconditions.checkNotNull(keyComparator); + } + + public BucketsFileSystem getBucketsFileSystem() + { + return bucketsFileSystem; + } + + /** + * @return number of worker threads in the reader service. + */ + public int getNumReaders() + { + return numReaders; + } + + /** + * Sets the number of worker threads in the reader service which is responsible for asynchronously fetching + * values of the keys. This should not exceed number of buckets. + * + * @param numReaders number of worker threads in the reader service. + */ + public void setNumReaders(int numReaders) + { + this.numReaders = numReaders; + } + + /** + * @return regular interval at which the size of state is checked. + */ + public Duration getCheckStateSizeInterval() + { + return checkStateSizeInterval; + } + + /** + * Sets the interval at which the size of state is regularly checked. + + * @param checkStateSizeInterval regular interval at which the size of state is checked. + */ + public void setCheckStateSizeInterval(@NotNull Duration checkStateSizeInterval) + { + this.checkStateSizeInterval = Preconditions.checkNotNull(checkStateSizeInterval); + } + + /** + * @return duration which prevents a bucket being evicted. + */ + public Duration getDurationPreventingFreeingSpace() + { + return durationPreventingFreeingSpace; + } + + /** + * Sets the duration which prevents buckets to free space. For example if this is set to an hour, then only + * buckets which were not accessed in last one hour will be triggered to free spaces. + * + * @param durationPreventingFreeingSpace time duration + */ + public void setDurationPreventingFreeingSpace(Duration durationPreventingFreeingSpace) + { + this.durationPreventingFreeingSpace = durationPreventingFreeingSpace; + } + + static class ValueFetchTask implements Callable<Slice> + { + private final Bucket bucket; + private final long timeBucketId; + private final Slice key; + private final AbstractManagedStateImpl managedState; + + ValueFetchTask(@NotNull Bucket bucket, @NotNull Slice key, long timeBucketId, AbstractManagedStateImpl managedState) + { + this.bucket = Preconditions.checkNotNull(bucket); + this.timeBucketId = timeBucketId; + this.key = Preconditions.checkNotNull(key); + this.managedState = Preconditions.checkNotNull(managedState); + } + + @Override + public Slice call() throws Exception + { + try { + synchronized (bucket) { + //a particular bucket should only be handled by one thread at any point of time. Handling of bucket here + //involves creating readers for the time buckets and de-serializing key/value from a reader. + Slice value = bucket.get(key, timeBucketId, Bucket.ReadSource.ALL); + managedState.tasksPerBucketId.remove(bucket.getBucketId(), this); + return value; + } + } catch (Throwable t) { + managedState.throwable.set(t); + throw Throwables.propagate(t); + } + } + } + + @VisibleForTesting + void setStateTracker(@NotNull StateTracker stateTracker) + { + this.stateTracker = Preconditions.checkNotNull(stateTracker, "state tracker"); + } + + @VisibleForTesting + void setBucketsFileSystem(@NotNull BucketsFileSystem bucketsFileSystem) + { + this.bucketsFileSystem = Preconditions.checkNotNull(bucketsFileSystem, "buckets file system"); + } + + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateImpl.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java new file mode 100644 index 0000000..b2c1618 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java @@ -0,0 +1,525 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.lib.fileaccess.FileAccess; +import com.datatorrent.netlet.util.Slice; + +/** + * A bucket that groups events. + */ +public interface Bucket extends ManagedStateComponent +{ + /** + * @return bucket id + */ + long getBucketId(); + + /** + * + * @return size of bucket in memory. + */ + long getSizeInBytes(); + + /** + * Get value of a key. + * + * @param key key. + * @param timeBucket time bucket of the key if known; -1 otherwise. + * @param source source to read from + * @return value of the key. + */ + Slice get(Slice key, long timeBucket, ReadSource source); + + /** + * Set value of a key. + * + * @param key key. + * @param timeBucket timeBucket of the key. + * @param value value of the key. + */ + void put(Slice key, long timeBucket, Slice value); + + /** + * Triggers the bucket to checkpoint. Returns the non checkpointed data so far. + * + * @return non checkpointed data. + */ + Map<Slice, BucketedValue> checkpoint(long windowId); + + /** + * Triggers the bucket to commit data till provided window id. + * + * @param windowId window id + */ + void committed(long windowId); + + /** + * Triggers bucket to free memory which is already persisted in bucket data files. + * + * @return amount of memory freed in bytes. + * @throws IOException + */ + long freeMemory() throws IOException; + + /** + * Allows the bucket to process/cache data which is recovered (from window files) after failure. + * + * @param windowId recovery window + * @param recoveredData recovered data + */ + void recoveredData(long windowId, Map<Slice, Bucket.BucketedValue> recoveredData); + + enum ReadSource + { + MEMORY, //state in memory in key/value form + READERS, //these are streams in which the key will be searched and serialized. + ALL //both the above states. + } + + class BucketedValue + { + private long timeBucket; + private Slice value; + + protected BucketedValue() + { + } + + protected BucketedValue(long timeBucket, Slice value) + { + this.timeBucket = timeBucket; + this.value = value; + } + + protected long getTimeBucket() + { + return timeBucket; + } + + protected void setTimeBucket(long timeBucket) + { + this.timeBucket = timeBucket; + } + + public Slice getValue() + { + return value; + } + + public void setValue(Slice value) + { + this.value = value; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof BucketedValue)) { + return false; + } + + BucketedValue that = (BucketedValue)o; + + return timeBucket == that.timeBucket && value.equals(that.value); + + } + + @Override + public int hashCode() + { + return Objects.hash(timeBucket, value); + } + } + + /** + * Default bucket.<br/> + * Not thread-safe. + */ + class DefaultBucket implements Bucket + { + private final long bucketId; + + //Key -> Ordered values + private transient Map<Slice, BucketedValue> flash = Maps.newConcurrentMap(); + + //Data persisted in write ahead logs. window -> bucket + private final transient ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> checkpointedData = + new ConcurrentSkipListMap<>(); + + //Data persisted in bucket data files + private final transient Map<Slice, BucketedValue> committedData = Maps.newConcurrentMap(); + + //Data serialized/deserialized from bucket data files: key -> value from latest time bucket on file + private final transient Map<Slice, BucketedValue> fileCache = Maps.newConcurrentMap(); + + //TimeBucket -> FileReaders + private final transient Map<Long, FileAccess.FileReader> readers = Maps.newTreeMap(); + + protected transient ManagedStateContext managedStateContext; + + private AtomicLong sizeInBytes = new AtomicLong(0); + + private final transient Slice dummyGetKey = new Slice(null, 0, 0); + + private transient TreeSet<BucketsFileSystem.TimeBucketMeta> cachedBucketMetas; + + private DefaultBucket() + { + //for kryo + bucketId = -1; + } + + protected DefaultBucket(long bucketId) + { + this.bucketId = bucketId; + } + + @Override + public void setup(@NotNull ManagedStateContext managedStateContext) + { + this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context"); + } + + @Override + public long getBucketId() + { + return bucketId; + } + + @Override + public long getSizeInBytes() + { + return sizeInBytes.longValue(); + } + + private Slice getFromMemory(Slice key) + { + //search the cache for key + BucketedValue bucketedValue = flash.get(key); + if (bucketedValue != null) { + return bucketedValue.getValue(); + } + + for (Long window : checkpointedData.descendingKeySet()) { + //traverse the checkpointed data in reverse order + bucketedValue = checkpointedData.get(window).get(key); + if (bucketedValue != null) { + return bucketedValue.getValue(); + } + } + + bucketedValue = committedData.get(key); + if (bucketedValue != null) { + return bucketedValue.getValue(); + } + + bucketedValue = fileCache.get(key); + if (bucketedValue != null) { + return bucketedValue.getValue(); + } + + return null; + } + + private Slice getFromReaders(Slice key, long timeBucket) + { + try { + if (cachedBucketMetas == null) { + cachedBucketMetas = managedStateContext.getBucketsFileSystem().getAllTimeBuckets(bucketId); + } + if (timeBucket != -1) { + Slice valSlice = getValueFromTimeBucketReader(key, timeBucket); + if (valSlice != null) { + if (timeBucket == cachedBucketMetas.first().getTimeBucketId()) { + //if the requested time bucket is the latest time bucket on file, the key/value is put in the file cache. + BucketedValue bucketedValue = new BucketedValue(timeBucket, valSlice); + fileCache.put(key, bucketedValue); + } + } + return valSlice; + } else { + //search all the time buckets + for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas) { + + if (managedStateContext.getKeyComparator().compare(key, immutableTimeBucketMeta.getFirstKey()) >= 0) { + //keys in the time bucket files are sorted so if the first key in the file is greater than the key being + //searched, the key will not be present in that file. + Slice valSlice = getValueFromTimeBucketReader(key, immutableTimeBucketMeta.getTimeBucketId()); + if (valSlice != null) { + BucketedValue bucketedValue = new BucketedValue(immutableTimeBucketMeta.getTimeBucketId(), valSlice); + //Only when the key is read from the latest time bucket on the file, the key/value is put in the file + // cache. + fileCache.put(key, bucketedValue); + return valSlice; + } + } + } + return null; + } + + } catch (IOException e) { + throw new RuntimeException("get time-buckets " + bucketId, e); + } + } + + @Override + public Slice get(Slice key, long timeBucket, ReadSource readSource) + { + switch (readSource) { + case MEMORY: + return getFromMemory(key); + case READERS: + return getFromReaders(key, timeBucket); + case ALL: + default: + Slice value = getFromMemory(key); + if (value != null) { + return value; + } + return getFromReaders(key, timeBucket); + } + } + + /** + * Returns the value for the key from a time-bucket reader + * @param key key + * @param timeBucket time bucket + * @return value if key is found in the time bucket; false otherwise + */ + private Slice getValueFromTimeBucketReader(Slice key, long timeBucket) + { + FileAccess.FileReader fileReader = readers.get(timeBucket); + if (fileReader != null) { + return readValue(fileReader, key, timeBucket); + } + //file reader is not loaded and is null + try { + if (loadFileReader(timeBucket)) { + return readValue(readers.get(timeBucket), key, timeBucket); + } + return null; + } catch (IOException e) { + throw new RuntimeException("while loading " + bucketId + ", " + timeBucket, e); + } + } + + private Slice readValue(FileAccess.FileReader fileReader, Slice key, long timeBucket) + { + Slice valSlice = new Slice(null, 0, 0); + try { + if (fileReader.seek(key)) { + fileReader.next(dummyGetKey, valSlice); + return valSlice; + } else { + return null; + } + } catch (IOException e) { + throw new RuntimeException("reading " + bucketId + ", " + timeBucket, e); + } + } + + private boolean loadFileReader(long timeBucketId) throws IOException + { + BucketsFileSystem.TimeBucketMeta tbm = managedStateContext.getBucketsFileSystem() + .getTimeBucketMeta(bucketId, timeBucketId); + + if (tbm != null) { + FileAccess.FileReader reader = managedStateContext.getBucketsFileSystem().getReader(bucketId, + BucketsFileSystem.getFileName(timeBucketId)); + readers.put(timeBucketId, reader); + sizeInBytes.getAndAdd(tbm.getSizeInBytes()); + return true; + } + return false; + } + + @Override + public void put(Slice key, long timeBucket, Slice value) + { + BucketedValue bucketedValue = flash.get(key); + if (bucketedValue == null) { + bucketedValue = new BucketedValue(); + flash.put(key, bucketedValue); + sizeInBytes.getAndAdd(key.length); + sizeInBytes.getAndAdd(Long.SIZE); + } + if (timeBucket > bucketedValue.getTimeBucket()) { + + int inc = null == bucketedValue.getValue() ? value.length : value.length - bucketedValue.getValue().length; + sizeInBytes.getAndAdd(inc); + bucketedValue.setTimeBucket(timeBucket); + bucketedValue.setValue(value); + } + } + + @Override + public long freeMemory() throws IOException + { + LOG.debug("free space {}", bucketId); + long memoryFreed = 0; + for (Map.Entry<Slice, BucketedValue> entry : committedData.entrySet()) { + memoryFreed += entry.getKey().length + entry.getValue().getValue().length; + } + committedData.clear(); + fileCache.clear(); + if (cachedBucketMetas != null) { + + for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas) { + FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId()); + if (reader != null) { + memoryFreed += tbm.getSizeInBytes(); + reader.close(); + } + } + + } + sizeInBytes.getAndAdd(-memoryFreed); + return memoryFreed; + } + + @Override + public Map<Slice, BucketedValue> checkpoint(long windowId) + { + try { + //transferring the data from flash to check-pointed state in finally block and re-initializing the flash. + return flash; + } finally { + checkpointedData.put(windowId, flash); + flash = Maps.newHashMap(); + } + } + + @Override + public void committed(long committedWindowId) + { + Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> stateIterator = checkpointedData.entrySet().iterator(); + + while (stateIterator.hasNext()) { + Map.Entry<Long, Map<Slice, BucketedValue>> entry = stateIterator.next(); + + long savedWindow = entry.getKey(); + if (savedWindow <= committedWindowId) { + Map<Slice, BucketedValue> bucketData = entry.getValue(); + + //removing any stale values from the file cache + for (Slice key : bucketData.keySet()) { + fileCache.remove(key); + } + + for (BucketedValue bucketedValue : bucketData.values()) { + FileAccess.FileReader reader = readers.get(bucketedValue.getTimeBucket()); + if (reader != null) { + //closing the file reader for the time bucket if it is in memory because the time-bucket is modified + //so it will be re-written by BucketsDataManager + try { + LOG.debug("closing reader {} {}", bucketId, bucketedValue.getTimeBucket()); + reader.close(); + } catch (IOException e) { + throw new RuntimeException("closing reader " + bucketId + ", " + bucketedValue.getTimeBucket(), e); + } + readers.remove(bucketedValue.getTimeBucket()); + } + if (readers.isEmpty()) { + break; + } + } + committedData.putAll(bucketData); + stateIterator.remove(); + } else { + break; + } + } + + cachedBucketMetas = null; + } + + @Override + public void recoveredData(long recoveredWindow, Map<Slice, BucketedValue> data) + { + checkpointedData.put(recoveredWindow, data); + } + + @Override + public void teardown() + { + Set<Long> failureBuckets = Sets.newHashSet(); + for (Map.Entry<Long, FileAccess.FileReader> entry : readers.entrySet()) { + try { + LOG.debug("closing reader {} {}", bucketId, entry.getKey()); + entry.getValue().close(); + } catch (IOException e) { + //will try to close all readers + failureBuckets.add(entry.getKey()); + } + } + if (!failureBuckets.isEmpty()) { + StringBuilder builder = new StringBuilder("teardown of "); + builder.append(bucketId).append(" < "); + for (Long timeBucket : failureBuckets) { + builder.append(timeBucket); + } + builder.append(">"); + throw new RuntimeException(builder.toString()); + } + } + + @VisibleForTesting + Map<Long, FileAccess.FileReader> getReaders() + { + return readers; + } + + @VisibleForTesting + Map<Slice, BucketedValue> getCommittedData() + { + return committedData; + } + + @VisibleForTesting + ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> getCheckpointedData() + { + return checkpointedData; + } + + private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java new file mode 100644 index 0000000..8304fb6 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java @@ -0,0 +1,566 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentSkipListSet; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import com.google.common.collect.Table; +import com.google.common.collect.TreeBasedTable; + +import com.datatorrent.lib.fileaccess.FileAccess; +import com.datatorrent.netlet.util.Slice; + +/** + * Persists bucket data on disk and maintains meta information about the buckets. + * <p/> + * + * Each bucket has a meta-data file and the format of that is :<br/> + * <ol> + * <li>version of the meta data (int)</li> + * <li>total number of time-buckets (int)</li> + * <li>For each time bucket + * <ol> + * <li>time bucket key (long)</li> + * <li>size of data (sum of bytes) (long)</li> + * <li>last transferred window id (long)</li> + * <li>length of the first key in the time-bucket file (int)</li> + * <li>first key in the time-bucket file (byte[])</li> + * </ol> + * </li> + * </ol> + * <p/> + * Meta data information is updated by {@link IncrementalCheckpointManager}. Any updates are restricted to the package. + */ +public class BucketsFileSystem implements ManagedStateComponent +{ + static final String META_FILE_NAME = "_META"; + private static final int META_FILE_VERSION = 1; + + private final transient TreeBasedTable<Long, Long, MutableTimeBucketMeta> timeBucketsMeta = TreeBasedTable.create(); + + //Check-pointed set of all buckets this instance has written to. + protected final Set<Long> bucketNamesOnFS = new ConcurrentSkipListSet<>(); + + protected transient ManagedStateContext managedStateContext; + + @Override + public void setup(@NotNull ManagedStateContext managedStateContext) + { + this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context"); + } + + protected FileAccess.FileWriter getWriter(long bucketId, String fileName) throws IOException + { + return managedStateContext.getFileAccess().getWriter(bucketId, fileName); + } + + protected FileAccess.FileReader getReader(long bucketId, String fileName) throws IOException + { + return managedStateContext.getFileAccess().getReader(bucketId, fileName); + } + + protected void rename(long bucketId, String fromName, String toName) throws IOException + { + managedStateContext.getFileAccess().rename(bucketId, fromName, toName); + } + + protected DataOutputStream getOutputStream(long bucketId, String fileName) throws IOException + { + return managedStateContext.getFileAccess().getOutputStream(bucketId, fileName); + } + + protected DataInputStream getInputStream(long bucketId, String fileName) throws IOException + { + return managedStateContext.getFileAccess().getInputStream(bucketId, fileName); + } + + protected boolean exists(long bucketId, String fileName) throws IOException + { + return managedStateContext.getFileAccess().exists(bucketId, fileName); + } + + protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId) throws IOException + { + return managedStateContext.getFileAccess().listFiles(bucketId); + } + + protected void delete(long bucketId, String fileName) throws IOException + { + managedStateContext.getFileAccess().delete(bucketId, fileName); + } + + protected void deleteBucket(long bucketId) throws IOException + { + managedStateContext.getFileAccess().deleteBucket(bucketId); + } + + /** + * Saves data to a bucket. The data consists of key/values of all time-buckets of a particular bucket. + * + * @param windowId window id + * @param bucketId bucket id + * @param data data of all time-buckets + * @throws IOException + */ + protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data) throws IOException + { + Table<Long, Slice, Bucket.BucketedValue> timeBucketedKeys = TreeBasedTable.create(Ordering.<Long>natural(), + managedStateContext.getKeyComparator()); + + for (Map.Entry<Slice, Bucket.BucketedValue> entry : data.entrySet()) { + long timeBucketId = entry.getValue().getTimeBucket(); + timeBucketedKeys.put(timeBucketId, entry.getKey(), entry.getValue()); + } + + for (long timeBucket : timeBucketedKeys.rowKeySet()) { + BucketsFileSystem.MutableTimeBucketMeta tbm = getOrCreateTimeBucketMeta(bucketId, timeBucket); + addBucketName(bucketId); + + long dataSize = 0; + Slice firstKey = null; + + FileAccess.FileWriter fileWriter; + String tmpFileName = getTmpFileName(); + if (tbm.getLastTransferredWindowId() == -1) { + //A new time bucket so we append all the key/values to the new file + fileWriter = getWriter(bucketId, tmpFileName); + + for (Map.Entry<Slice, Bucket.BucketedValue> entry : timeBucketedKeys.row(timeBucket).entrySet()) { + Slice key = entry.getKey(); + Slice value = entry.getValue().getValue(); + + dataSize += key.length; + dataSize += value.length; + + fileWriter.append(key.toByteArray(), value.toByteArray()); + if (firstKey == null) { + firstKey = key; + } + } + } else { + //the time bucket existed so we need to read the file and then re-write it + TreeMap<Slice, Slice> fileData = new TreeMap<>(managedStateContext.getKeyComparator()); + FileAccess.FileReader fileReader = getReader(bucketId, getFileName(timeBucket)); + fileReader.readFully(fileData); + fileReader.close(); + + for (Map.Entry<Slice, Bucket.BucketedValue> entry : timeBucketedKeys.row(timeBucket).entrySet()) { + fileData.put(entry.getKey(), entry.getValue().getValue()); + } + + fileWriter = getWriter(bucketId, tmpFileName); + for (Map.Entry<Slice, Slice> entry : fileData.entrySet()) { + Slice key = entry.getKey(); + Slice value = entry.getValue(); + + dataSize += key.length; + dataSize += value.length; + + fileWriter.append(key.toByteArray(), value.toByteArray()); + if (firstKey == null) { + firstKey = key; + } + } + } + fileWriter.close(); + rename(bucketId, tmpFileName, getFileName(timeBucket)); + tbm.updateTimeBucketMeta(windowId, dataSize, firstKey); + } + + updateBucketMetaFile(bucketId); + } + + /** + * Retrieves the time bucket meta of a particular time-bucket. If the time bucket doesn't exist then a new one + * is created. + * + * @param bucketId bucket id + * @param timeBucketId time bucket id + * @return time bucket meta of the time bucket + * @throws IOException + */ + @NotNull + MutableTimeBucketMeta getOrCreateTimeBucketMeta(long bucketId, long timeBucketId) throws IOException + { + synchronized (timeBucketsMeta) { + MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId); + if (tbm == null) { + tbm = new MutableTimeBucketMeta(bucketId, timeBucketId); + timeBucketsMeta.put(bucketId, timeBucketId, tbm); + } + return tbm; + } + } + + protected void addBucketName(long bucketId) + { + bucketNamesOnFS.add(bucketId); + } + + /** + * Returns the time bucket meta of a particular time-bucket which is immutable. + * + * @param bucketId bucket id + * @param timeBucketId time bucket id + * @return immutable time bucket meta + * @throws IOException + */ + @Nullable + public TimeBucketMeta getTimeBucketMeta(long bucketId, long timeBucketId) throws IOException + { + synchronized (timeBucketsMeta) { + MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId); + if (tbm != null) { + return tbm.getImmutableTimeBucketMeta(); + } + return null; + } + } + + /** + * This should be entered only after acquiring the lock on {@link #timeBucketsMeta} + * + * @param bucketId bucket id + * @param timeBucketId time bucket id + * @return Mutable time bucket meta for a bucket id and time bucket id. + * @throws IOException + */ + private MutableTimeBucketMeta timeBucketMetaHelper(long bucketId, long timeBucketId) throws IOException + { + MutableTimeBucketMeta tbm = timeBucketsMeta.get(bucketId, timeBucketId); + if (tbm != null) { + return tbm; + } + if (exists(bucketId, META_FILE_NAME)) { + try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) { + //Load meta info of all the time buckets of the bucket identified by bucketId. + loadBucketMetaFile(bucketId, dis); + } + } else { + return null; + } + return timeBucketsMeta.get(bucketId, timeBucketId); + } + + /** + * Returns the meta information of all the time buckets in the bucket in descending order - latest to oldest. + * + * @param bucketId bucket id + * @return all the time buckets in order - latest to oldest + */ + public TreeSet<TimeBucketMeta> getAllTimeBuckets(long bucketId) throws IOException + { + synchronized (timeBucketsMeta) { + TreeSet<TimeBucketMeta> immutableTimeBucketMetas = Sets.newTreeSet( + Collections.<TimeBucketMeta>reverseOrder()); + + if (timeBucketsMeta.containsRow(bucketId)) { + for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) { + immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta()); + } + return immutableTimeBucketMetas; + } + if (exists(bucketId, META_FILE_NAME)) { + try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) { + //Load meta info of all the time buckets of the bucket identified by bucket id + loadBucketMetaFile(bucketId, dis); + for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) { + immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta()); + } + return immutableTimeBucketMetas; + } + } + return immutableTimeBucketMetas; + } + } + + /** + * Loads the bucket meta-file. This should be entered only after acquiring the lock on {@link #timeBucketsMeta}. + * + * @param bucketId bucket id + * @param dis data input stream + * @throws IOException + */ + private void loadBucketMetaFile(long bucketId, DataInputStream dis) throws IOException + { + int metaDataVersion = dis.readInt(); + + if (metaDataVersion == META_FILE_VERSION) { + int numberOfEntries = dis.readInt(); + + for (int i = 0; i < numberOfEntries; i++) { + long timeBucketId = dis.readLong(); + long dataSize = dis.readLong(); + long lastTransferredWindow = dis.readLong(); + + MutableTimeBucketMeta tbm = new MutableTimeBucketMeta(bucketId, timeBucketId); + + int sizeOfFirstKey = dis.readInt(); + byte[] firstKeyBytes = new byte[sizeOfFirstKey]; + dis.readFully(firstKeyBytes, 0, firstKeyBytes.length); + tbm.updateTimeBucketMeta(lastTransferredWindow, dataSize, new Slice(firstKeyBytes)); + + timeBucketsMeta.put(bucketId, timeBucketId, tbm); + } + } + } + + /** + * Saves the updated bucket meta on disk. + * + * @param bucketId bucket id + * @throws IOException + */ + void updateBucketMetaFile(long bucketId) throws IOException + { + Map<Long, MutableTimeBucketMeta> timeBuckets; + synchronized (timeBucketsMeta) { + timeBuckets = timeBucketsMeta.row(bucketId); + + Preconditions.checkNotNull(timeBuckets, "timeBuckets"); + String tmpFileName = getTmpFileName(); + + try (DataOutputStream dos = getOutputStream(bucketId, tmpFileName)) { + dos.writeInt(META_FILE_VERSION); + dos.writeInt(timeBuckets.size()); + for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBuckets.entrySet()) { + MutableTimeBucketMeta tbm = entry.getValue(); + dos.writeLong(tbm.getTimeBucketId()); + dos.writeLong(tbm.getSizeInBytes()); + dos.writeLong(tbm.getLastTransferredWindowId()); + dos.writeInt(tbm.getFirstKey().length); + dos.write(tbm.getFirstKey().toByteArray()); + } + + } + rename(bucketId, tmpFileName, META_FILE_NAME); + } + } + + protected void deleteTimeBucketsLessThanEqualTo(long latestExpiredTimeBucket) throws IOException + { + LOG.debug("delete files before {}", latestExpiredTimeBucket); + + for (long bucketName : bucketNamesOnFS) { + RemoteIterator<LocatedFileStatus> timeBucketsIterator = listFiles(bucketName); + boolean emptyBucket = true; + while (timeBucketsIterator.hasNext()) { + LocatedFileStatus timeBucketStatus = timeBucketsIterator.next(); + + String timeBucketStr = timeBucketStatus.getPath().getName(); + if (timeBucketStr.equals(BucketsFileSystem.META_FILE_NAME) || timeBucketStr.endsWith(".tmp")) { + //ignoring meta and tmp files + continue; + } + long timeBucket = Long.parseLong(timeBucketStr); + + if (timeBucket <= latestExpiredTimeBucket) { + LOG.debug("deleting bucket {} time-bucket {}", timeBucket); + invalidateTimeBucket(bucketName, timeBucket); + delete(bucketName, timeBucketStatus.getPath().getName()); + } else { + emptyBucket = false; + } + } + if (emptyBucket) { + LOG.debug("deleting bucket {}", bucketName); + deleteBucket(bucketName); + } + } + } + + void invalidateTimeBucket(long bucketId, long timeBucketId) throws IOException + { + synchronized (timeBucketsMeta) { + timeBucketsMeta.remove(bucketId, timeBucketId); + } + updateBucketMetaFile(bucketId); + } + + @Override + public void teardown() + { + } + + /** + * This serves the readers - {@link Bucket.DefaultBucket}. + * It is immutable and accessible outside the package unlike {@link MutableTimeBucketMeta}. + */ + public static class TimeBucketMeta implements Comparable<TimeBucketMeta> + { + private final long bucketId; + private final long timeBucketId; + private long lastTransferredWindowId = -1; + private long sizeInBytes; + private Slice firstKey; + + private TimeBucketMeta() + { + //for kryo + bucketId = -1; + timeBucketId = -1; + } + + private TimeBucketMeta(long bucketId, long timeBucketId) + { + this.bucketId = bucketId; + this.timeBucketId = timeBucketId; + } + + public long getLastTransferredWindowId() + { + return lastTransferredWindowId; + } + + public long getSizeInBytes() + { + return this.sizeInBytes; + } + + public long getBucketId() + { + return bucketId; + } + + public long getTimeBucketId() + { + return timeBucketId; + } + + public Slice getFirstKey() + { + return firstKey; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof TimeBucketMeta)) { + return false; + } + + TimeBucketMeta that = (TimeBucketMeta)o; + + return bucketId == that.bucketId && timeBucketId == that.timeBucketId; + } + + @Override + public int hashCode() + { + return Objects.hash(bucketId, timeBucketId); + } + + @Override + public int compareTo(@NotNull TimeBucketMeta o) + { + if (bucketId < o.bucketId) { + return -1; + } + if (bucketId > o.bucketId) { + return 1; + } + if (timeBucketId < o.timeBucketId) { + return -1; + } + if (timeBucketId > o.timeBucketId) { + return 1; + } + return 0; + } + } + + /** + * Represents time bucket meta information which can be changed. + * The updates to an instance and read/creation of {@link #immutableTimeBucketMeta} belonging to it are synchronized + * as different threads are updating and reading from it.<br/> + * + * The instance is updated when data from window files are transferred to bucket files and + * {@link Bucket.DefaultBucket} reads the immutable time bucket meta. + */ + static class MutableTimeBucketMeta extends TimeBucketMeta + { + private transient TimeBucketMeta immutableTimeBucketMeta; + + private volatile boolean changed; + + public MutableTimeBucketMeta(long bucketId, long timeBucketId) + { + super(bucketId, timeBucketId); + } + + synchronized void updateTimeBucketMeta(long lastTransferredWindow, long bytes, @NotNull Slice firstKey) + { + changed = true; + super.lastTransferredWindowId = lastTransferredWindow; + super.sizeInBytes = bytes; + super.firstKey = Preconditions.checkNotNull(firstKey, "first key"); + } + + synchronized TimeBucketMeta getImmutableTimeBucketMeta() + { + if (immutableTimeBucketMeta == null || changed) { + + immutableTimeBucketMeta = new TimeBucketMeta(getBucketId(), getTimeBucketId()); + immutableTimeBucketMeta.lastTransferredWindowId = getLastTransferredWindowId(); + immutableTimeBucketMeta.sizeInBytes = getSizeInBytes(); + immutableTimeBucketMeta.firstKey = getFirstKey(); + changed = false; + } + return immutableTimeBucketMeta; + } + + } + + protected static String getFileName(long timeBucketId) + { + return Long.toString(timeBucketId); + } + + protected static String getTmpFileName() + { + return System.currentTimeMillis() + ".tmp"; + } + + private static final Logger LOG = LoggerFactory.getLogger(BucketsFileSystem.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java new file mode 100644 index 0000000..a372163 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Queues; + +import com.datatorrent.api.Context; +import com.datatorrent.common.util.NameableThreadFactory; +import com.datatorrent.lib.util.WindowDataManager; +import com.datatorrent.netlet.util.Slice; + +/** + * Manages state which is written to files by windows. The state from the window files are then transferred to bucket + * data files. This class listens to time expiry events issued by {@link TimeBucketAssigner}. + * + * This component is also responsible for purging old time buckets. + */ +public class IncrementalCheckpointManager extends WindowDataManager.FSWindowDataManager + implements ManagedStateComponent +{ + private static final String WAL_RELATIVE_PATH = "managed_state"; + + //windowId => (bucketId => data) + private final transient Map<Long, Map<Long, Map<Slice, Bucket.BucketedValue>>> savedWindows = new + ConcurrentSkipListMap<>(); + + private transient ExecutorService writerService; + private transient volatile boolean transfer; + + private final transient LinkedBlockingQueue<Long> windowsToTransfer = Queues.newLinkedBlockingQueue(); + private final transient AtomicReference<Throwable> throwable = new AtomicReference<>(); + + protected transient ManagedStateContext managedStateContext; + + private final transient AtomicLong latestExpiredTimeBucket = new AtomicLong(-1); + + private transient int waitMillis; + + + public IncrementalCheckpointManager() + { + super(); + setRecoveryPath(WAL_RELATIVE_PATH); + } + + @Override + public void setup(Context.OperatorContext context) + { + throw new UnsupportedOperationException("not supported"); + } + + @Override + public void setup(@NotNull final ManagedStateContext managedStateContext) + { + this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context"); + waitMillis = managedStateContext.getOperatorContext().getValue(Context.OperatorContext.SPIN_MILLIS); + super.setup(managedStateContext.getOperatorContext()); + + writerService = Executors.newSingleThreadExecutor(new NameableThreadFactory("managed-state-writer")); + transfer = true; + writerService.submit(new Runnable() + { + @Override + public void run() + { + while (transfer) { + transferWindowFiles(); + if (latestExpiredTimeBucket.get() > -1) { + try { + managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo( + latestExpiredTimeBucket.getAndSet(-1)); + } catch (IOException e) { + throwable.set(e); + LOG.debug("delete files", e); + Throwables.propagate(e); + } + } + } + } + }); + } + + protected void transferWindowFiles() + { + try { + Long windowId = windowsToTransfer.poll(); + if (windowId != null) { + try { + LOG.debug("transfer window {}", windowId); + //bucket id => bucket data(key => value, time-buckets) + Map<Long, Map<Slice, Bucket.BucketedValue>> buckets = savedWindows.remove(windowId); + + for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> singleBucket : buckets.entrySet()) { + managedStateContext.getBucketsFileSystem().writeBucketData(windowId, singleBucket.getKey(), + singleBucket.getValue()); + } + storageAgent.delete(managedStateContext.getOperatorContext().getId(), windowId); + } catch (Throwable t) { + throwable.set(t); + LOG.debug("transfer window {}", windowId, t); + Throwables.propagate(t); + } + } else { + Thread.sleep(waitMillis); + } + } catch (InterruptedException ex) { + //sleep can be interrupted by teardown so no need to re-throw interrupt exception + LOG.debug("interrupted", ex); + } + } + + @Override + public void save(Object object, int operatorId, long windowId) throws IOException + { + throw new UnsupportedOperationException("doesn't support saving any object"); + } + + /** + * The unsaved state combines data received in multiple windows. This window data manager persists this data + * on disk by the window id in which it was requested. + * @param unsavedData un-saved data of all buckets. + * @param operatorId operator id. + * @param windowId window id. + * @param skipWriteToWindowFile flag that enables/disables saving the window file. + * + * @throws IOException + */ + public void save(Map<Long, Map<Slice, Bucket.BucketedValue>> unsavedData, int operatorId, long windowId, + boolean skipWriteToWindowFile) + throws IOException + { + Throwable lthrowable; + if ((lthrowable = throwable.get()) != null) { + LOG.error("Error while transferring"); + Throwables.propagate(lthrowable); + } + savedWindows.put(windowId, unsavedData); + + if (!skipWriteToWindowFile) { + super.save(unsavedData, operatorId, windowId); + } + } + + + + /** + * Transfers the data which has been committed till windowId to data files. + * + * @param operatorId operator id + * @param windowId window id + */ + @SuppressWarnings("UnusedParameters") + protected void committed(int operatorId, long windowId) throws IOException, InterruptedException + { + LOG.debug("data manager committed {}", windowId); + for (Long currentWindow : savedWindows.keySet()) { + if (currentWindow <= windowId) { + LOG.debug("to transfer {}", windowId); + windowsToTransfer.add(currentWindow); + } else { + break; + } + } + } + + @Override + public void teardown() + { + super.teardown(); + transfer = false; + writerService.shutdownNow(); + } + + public void setLatestExpiredTimeBucket(long timeBucket) + { + latestExpiredTimeBucket.set(timeBucket); + } + + private static final Logger LOG = LoggerFactory.getLogger(IncrementalCheckpointManager.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java new file mode 100644 index 0000000..12928f1 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.state.managed; + +/** + * Managed state has a limit on amount of data in memory. + */ +public interface ManagedState +{ + /** + * Sets the maximum memory size. + * @param bytes max size in bytes. + */ + void setMaxMemorySize(long bytes); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java new file mode 100644 index 0000000..1044e15 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import javax.validation.constraints.NotNull; + +public interface ManagedStateComponent +{ + /** + * Callback to setup using managed state context + * + * @param managedStateContext managed state context + */ + void setup(@NotNull ManagedStateContext managedStateContext); + + /** + * Callback to perform teardown. + */ + void teardown(); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java new file mode 100644 index 0000000..406fdbd --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.Comparator; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.fileaccess.FileAccess; +import com.datatorrent.netlet.util.Slice; + +public interface ManagedStateContext +{ + FileAccess getFileAccess(); + + Context.OperatorContext getOperatorContext(); + + Comparator<Slice> getKeyComparator(); + + BucketsFileSystem getBucketsFileSystem(); + + TimeBucketAssigner getTimeBucketAssigner(); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java new file mode 100644 index 0000000..4c3cf84 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.concurrent.Future; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.BucketedState; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Basic implementation of {@link AbstractManagedStateImpl} where system time corresponding to an application window is + * used to sub-group key of a particular bucket.<br/> + */ +public class ManagedStateImpl extends AbstractManagedStateImpl implements BucketedState +{ + private long time = System.currentTimeMillis(); + private transient long timeIncrement; + + public ManagedStateImpl() + { + this.numBuckets = 1; + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); + } + + @Override + public void put(long bucketId, @NotNull Slice key, @NotNull Slice value) + { + long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + putInBucket(bucketId, timeBucket, key, value); + } + + @Override + public Slice getSync(long bucketId, @NotNull Slice key) + { + return getValueFromBucketSync(bucketId, -1, key); + } + + /** + * Returns the future using which the value is obtained.<br/> + * If the key is present in the bucket cache, then the future has its value set when constructed; + * if not the value is set after it's read from the data files which is after a while. + * + * @param key key + * @return value of the key if found; null if the key is not found; + */ + @Override + public Future<Slice> getAsync(long bucketId, @NotNull Slice key) + { + return getValueFromBucketAsync(bucketId, -1, key); + } + + @Override + public void endWindow() + { + super.endWindow(); + time += timeIncrement; + } + + @Min(1) + @Override + public int getNumBuckets() + { + return numBuckets; + } + + /** + * Sets the number of buckets. + * + * @param numBuckets number of buckets + */ + public void setNumBuckets(int numBuckets) + { + this.numBuckets = numBuckets; + } +}
