http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java new file mode 100644 index 0000000..708cfeb --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.concurrent.Future; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; + +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.api.annotation.OperatorAnnotation; +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.state.TimeSlicedBucketedState; +import com.datatorrent.netlet.util.Slice; + +/** + * This implementation of {@link AbstractManagedStateImpl} lets the client to specify the time for each key. + * The value of time is used to derive the time-bucket of a key. + */ +@OperatorAnnotation(checkpointableWithinAppWindow = false) +public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements TimeSlicedBucketedState +{ + public ManagedTimeStateImpl() + { + this.numBuckets = 1; + } + + @Override + public void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value) + { + long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + putInBucket(bucketId, timeBucket, key, value); + } + + @Override + public Slice getSync(long bucketId, @NotNull Slice key) + { + return getValueFromBucketSync(bucketId, -1, key); + } + + @Override + public Slice getSync(long bucketId, long time, @NotNull Slice key) + { + long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + if (timeBucket == -1) { + //time is expired so no point in looking further. + return BucketedState.EXPIRED; + } + return getValueFromBucketSync(bucketId, timeBucket, key); + } + + @Override + public Future<Slice> getAsync(long bucketId, Slice key) + { + return getValueFromBucketAsync(bucketId, -1, key); + } + + @Override + public Future<Slice> getAsync(long bucketId, long time, Slice key) + { + long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + if (timeBucket == -1) { + //time is expired so no point in looking further. + return Futures.immediateFuture(BucketedState.EXPIRED); + } + return getValueFromBucketAsync(bucketId, timeBucket, key); + } + + @Min(1) + @Override + public int getNumBuckets() + { + return numBuckets; + } + + /** + * Sets the number of buckets. + * + * @param numBuckets number of buckets + */ + public void setNumBuckets(int numBuckets) + { + this.numBuckets = numBuckets; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java new file mode 100644 index 0000000..6f531eb --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Queues; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; + +import com.datatorrent.lib.fileaccess.FileAccess; +import com.datatorrent.netlet.util.Slice; + +/** + * In this implementation of {@link AbstractManagedStateImpl} the buckets in memory are time-buckets. + */ +public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implements BucketedState +{ + private final transient LinkedBlockingQueue<Long> purgedTimeBuckets = Queues.newLinkedBlockingQueue(); + private final transient Set<Bucket> bucketsForTeardown = Sets.newHashSet(); + + public ManagedTimeUnifiedStateImpl() + { + bucketsFileSystem = new TimeUnifiedBucketsFileSystem(); + } + + @Override + public int getNumBuckets() + { + return timeBucketAssigner.getNumBuckets(); + } + + @Override + public void put(long time, @NotNull Slice key, @NotNull Slice value) + { + long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + putInBucket(timeBucket, timeBucket, key, value); + } + + @Override + public Slice getSync(long time, @NotNull Slice key) + { + long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + if (timeBucket == -1) { + //time is expired so return expired slice. + return BucketedState.EXPIRED; + } + return getValueFromBucketSync(timeBucket, timeBucket, key); + } + + @Override + public Future<Slice> getAsync(long time, @NotNull Slice key) + { + long timeBucket = timeBucketAssigner.getTimeBucketFor(time); + if (timeBucket == -1) { + //time is expired so return expired slice. + return Futures.immediateFuture(BucketedState.EXPIRED); + } + return getValueFromBucketAsync(timeBucket, timeBucket, key); + } + + @Override + public void endWindow() + { + super.endWindow(); + Long purgedTimeBucket; + + //collect all the purged time buckets + while (null != (purgedTimeBucket = purgedTimeBuckets.poll())) { + int purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket); + if (buckets[purgedTimeBucketIdx] != null && buckets[purgedTimeBucketIdx].getBucketId() == purgedTimeBucket) { + bucketsForTeardown.add(buckets[purgedTimeBucketIdx]); + buckets[purgedTimeBucketIdx] = null; + } + } + + //tear down all the eligible time buckets + Iterator<Bucket> bucketIterator = bucketsForTeardown.iterator(); + while (bucketIterator.hasNext()) { + Bucket bucket = bucketIterator.next(); + if (!tasksPerBucketId.containsKey(bucket.getBucketId())) { + //no pending asynchronous queries for this bucket id + bucket.teardown(); + bucketIterator.remove(); + } + } + } + + @Override + protected void handleBucketConflict(int bucketIdx, long newBucketId) + { + Preconditions.checkArgument(buckets[bucketIdx].getBucketId() < newBucketId, "new time bucket should have a value" + + " greater than the old time bucket"); + //Time buckets are purged periodically so here a bucket conflict is expected and so we just ignore conflicts. + bucketsForTeardown.add(buckets[bucketIdx]); + buckets[bucketIdx] = newBucket(newBucketId); + buckets[bucketIdx].setup(this); + } + + @Override + public void purgeTimeBucketsLessThanEqualTo(long timeBucket) + { + purgedTimeBuckets.add(timeBucket); + super.purgeTimeBucketsLessThanEqualTo(timeBucket); + } + + /** + * This uses operator id instead of bucket id as the name of parent folder of time-buckets. This is because + * multiple partitions may work on same time-buckets. + */ + private static class TimeUnifiedBucketsFileSystem extends BucketsFileSystem + { + @Override + protected FileAccess.FileWriter getWriter(long bucketId, String fileName) throws IOException + { + return managedStateContext.getFileAccess().getWriter(managedStateContext.getOperatorContext().getId(), fileName); + } + + @Override + protected FileAccess.FileReader getReader(long bucketId, String fileName) throws IOException + { + return managedStateContext.getFileAccess().getReader(managedStateContext.getOperatorContext().getId(), fileName); + } + + @Override + protected void rename(long bucketId, String fromName, String toName) throws IOException + { + managedStateContext.getFileAccess().rename(managedStateContext.getOperatorContext().getId(), fromName, toName); + } + + @Override + protected DataOutputStream getOutputStream(long bucketId, String fileName) throws IOException + { + return managedStateContext.getFileAccess().getOutputStream(managedStateContext.getOperatorContext().getId(), + fileName); + } + + @Override + protected DataInputStream getInputStream(long bucketId, String fileName) throws IOException + { + return managedStateContext.getFileAccess().getInputStream(managedStateContext.getOperatorContext().getId(), + fileName); + } + + @Override + protected boolean exists(long bucketId, String fileName) throws IOException + { + return managedStateContext.getFileAccess().exists(managedStateContext.getOperatorContext().getId(), + fileName); + } + + @Override + protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId) throws IOException + { + return managedStateContext.getFileAccess().listFiles(managedStateContext.getOperatorContext().getId()); + } + + @Override + protected void delete(long bucketId, String fileName) throws IOException + { + managedStateContext.getFileAccess().delete(managedStateContext.getOperatorContext().getId(), fileName); + } + + @Override + protected void deleteBucket(long bucketId) throws IOException + { + managedStateContext.getFileAccess().deleteBucket(managedStateContext.getOperatorContext().getId()); + } + + @Override + protected void addBucketName(long bucketId) + { + long operatorId = (long)managedStateContext.getOperatorContext().getId(); + if (!bucketNamesOnFS.contains(operatorId)) { + bucketNamesOnFS.add(operatorId); + } + } + } + + private static transient Logger LOG = LoggerFactory.getLogger(ManagedTimeUnifiedStateImpl.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java new file mode 100644 index 0000000..a359b31 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; + +import javax.validation.constraints.NotNull; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Tracks the size of state in memory and evicts buckets. + */ +class StateTracker extends TimerTask +{ + //bucket id -> bucket id & time wrapper + private final transient ConcurrentHashMap<Long, BucketIdTimeWrapper> bucketAccessTimes = new ConcurrentHashMap<>(); + + private transient ConcurrentSkipListSet<BucketIdTimeWrapper> bucketHeap; + + private final transient Timer memoryFreeService = new Timer(); + + protected transient AbstractManagedStateImpl managedStateImpl; + + void setup(@NotNull AbstractManagedStateImpl managedStateImpl) + { + this.managedStateImpl = Preconditions.checkNotNull(managedStateImpl, "managed state impl"); + + this.bucketHeap = new ConcurrentSkipListSet<>( + new Comparator<BucketIdTimeWrapper>() + { + //Note: this comparator imposes orderings that are inconsistent with equals. + @Override + public int compare(BucketIdTimeWrapper o1, BucketIdTimeWrapper o2) + { + if (o1.getLastAccessedTime() < o2.getLastAccessedTime()) { + return -1; + } + if (o1.getLastAccessedTime() > o2.getLastAccessedTime()) { + return 1; + } + + return Long.compare(o1.bucketId, o2.bucketId); + } + }); + long intervalMillis = managedStateImpl.getCheckStateSizeInterval().getMillis(); + memoryFreeService.scheduleAtFixedRate(this, intervalMillis, intervalMillis); + } + + void bucketAccessed(long bucketId) + { + BucketIdTimeWrapper idTimeWrapper = bucketAccessTimes.get(bucketId); + if (idTimeWrapper != null) { + bucketHeap.remove(idTimeWrapper); + } else { + idTimeWrapper = new BucketIdTimeWrapper(bucketId); + } + idTimeWrapper.setLastAccessedTime(System.currentTimeMillis()); + bucketHeap.add(idTimeWrapper); + } + + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + @Override + public void run() + { + synchronized (managedStateImpl.commitLock) { + //freeing of state needs to be stopped during commit as commit results in transferring data to a state which + // can be freed up as well. + long bytesSum = 0; + for (Bucket bucket : managedStateImpl.buckets) { + if (bucket != null) { + bytesSum += bucket.getSizeInBytes(); + } + } + + if (bytesSum > managedStateImpl.getMaxMemorySize()) { + Duration duration = managedStateImpl.getDurationPreventingFreeingSpace(); + long durationMillis = 0; + if (duration != null) { + durationMillis = duration.getMillis(); + } + + BucketIdTimeWrapper idTimeWrapper; + while (bytesSum > managedStateImpl.getMaxMemorySize() && bucketHeap.size() > 0 && + null != (idTimeWrapper = bucketHeap.first())) { + //trigger buckets to free space + + if (System.currentTimeMillis() - idTimeWrapper.getLastAccessedTime() < durationMillis) { + //if the least recently used bucket cannot free up space because it was accessed within the + //specified duration then subsequent buckets cannot free space as well because this heap is ordered by time. + break; + } + long bucketId = idTimeWrapper.bucketId; + Bucket bucket = managedStateImpl.getBucket(bucketId); + if (bucket != null) { + + synchronized (bucket) { + long sizeFreed; + try { + sizeFreed = bucket.freeMemory(); + LOG.debug("size freed {} {}", bucketId, sizeFreed); + } catch (IOException e) { + managedStateImpl.throwable.set(e); + throw new RuntimeException("freeing " + bucketId, e); + } + bytesSum -= sizeFreed; + } + bucketHeap.remove(idTimeWrapper); + bucketAccessTimes.remove(bucketId); + } + } + } + } + } + + void teardown() + { + memoryFreeService.cancel(); + } + + /** + * Wrapper class for bucket id and the last time the bucket was accessed. + */ + private static class BucketIdTimeWrapper + { + private final long bucketId; + private long lastAccessedTime; + + BucketIdTimeWrapper(long bucketId) + { + this.bucketId = bucketId; + } + + private synchronized long getLastAccessedTime() + { + return lastAccessedTime; + } + + private synchronized void setLastAccessedTime(long lastAccessedTime) + { + this.lastAccessedTime = lastAccessedTime; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof BucketIdTimeWrapper)) { + return false; + } + + BucketIdTimeWrapper that = (BucketIdTimeWrapper)o; + //Note: the comparator used with bucket heap imposes orderings that are inconsistent with equals + return bucketId == that.bucketId; + + } + + @Override + public int hashCode() + { + return (int)(bucketId ^ (bucketId >>> 32)); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(StateTracker.class); + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java new file mode 100644 index 0000000..745353b --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import javax.validation.constraints.NotNull; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.appdata.query.WindowBoundedService; + +/** + * Keeps track of time buckets.<br/> + * + * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets, + * which time-bucket an event falls into and sliding the time boundaries. + * <p/> + * + * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system + * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/> + * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and + * <code>rererenceInstant = current-time</code>, then <code> + * numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/> + * + * These properties once configured shouldn't be changed because that will result in different time-buckets + * for the same (key,time) pair after a failure. + * <p/> + * + * The time boundaries- start and end, periodically move by span of a single time-bucket. Any event with time < start + * is expired. These boundaries slide between application window by the expiry task asynchronously.<br/> + * The boundaries move only between an application window to ensure consistency of a checkpoint. Checkpoint will happen + * at application window boundaries so if we do not restrict moving start and end within an app window boundary, it may + * happen that old value of 'start' is saved with the new value of 'end'. + * + * <p/> + * + * The boundaries can also be moved by {@link #getTimeBucketFor(long)}. The time which is passed as an argument to this + * method can be ahead of <code>end</code>. This means that the corresponding event is a future event + * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further. + */ +public class TimeBucketAssigner implements ManagedStateComponent +{ + @NotNull + private Instant referenceInstant = new Instant(); + + @NotNull + @FieldSerializer.Bind(JavaSerializer.class) + private Duration expireBefore = Duration.standardDays(2); + + @FieldSerializer.Bind(JavaSerializer.class) + private Duration bucketSpan; + + private long bucketSpanMillis; + + private long start; + private long end; + private int numBuckets; + private transient long fixedStart; + private transient long lowestTimeBucket; + + private boolean initialized; + + private transient WindowBoundedService windowBoundedService; + + private transient PurgeListener purgeListener = null; + + private final transient Runnable expiryTask = new Runnable() + { + @Override + public void run() + { + synchronized (lock) { + start += bucketSpanMillis; + end += bucketSpanMillis; + if (purgeListener != null) { + purgeListener.purgeTimeBucketsLessThanEqualTo(lowestTimeBucket++); + } + } + } + }; + + private final transient Object lock = new Object(); + + @Override + public void setup(@NotNull ManagedStateContext managedStateContext) + { + Context.OperatorContext context = managedStateContext.getOperatorContext(); + fixedStart = referenceInstant.getMillis() - expireBefore.getMillis(); + + if (!initialized) { + if (bucketSpan == null) { + bucketSpan = Duration.millis(context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)); + } + start = fixedStart; + bucketSpanMillis = bucketSpan.getMillis(); + numBuckets = (int)((expireBefore.getMillis() + bucketSpanMillis - 1) / bucketSpanMillis); + end = start + (numBuckets * bucketSpanMillis); + + initialized = true; + } + lowestTimeBucket = (start - fixedStart) / bucketSpanMillis; + windowBoundedService = new WindowBoundedService(bucketSpanMillis, expiryTask); + windowBoundedService.setup(context); + } + + public void beginWindow(long windowId) + { + windowBoundedService.beginWindow(windowId); + } + + public void endWindow() + { + windowBoundedService.endWindow(); + } + + /** + * Get the bucket key for the long value. + * + * @param value value from which bucket key is derived. + * @return -1 if value is already expired; bucket key otherwise. + */ + public long getTimeBucketFor(long value) + { + synchronized (lock) { + if (value < start) { + return -1; + } + long diffFromStart = value - fixedStart; + long key = diffFromStart / bucketSpanMillis; + if (value > end) { + long move = ((value - end) / bucketSpanMillis + 1) * bucketSpanMillis; + start += move; + end += move; + } + return key; + } + } + + public void setPurgeListener(@NotNull PurgeListener purgeListener) + { + this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener"); + } + + @Override + public void teardown() + { + windowBoundedService.teardown(); + } + + /** + * @return number of buckets. + */ + public int getNumBuckets() + { + return numBuckets; + } + + /** + * @return reference instant + */ + public Instant getReferenceInstant() + { + return referenceInstant; + } + + /** + * Sets the reference instant (by default the system time when the streaming app is created). + * This instant with {@link #expireBefore} is used to calculate the {@link #start} and {@link #end}. + * + * @param referenceInstant + */ + public void setReferenceInstant(Instant referenceInstant) + { + this.referenceInstant = referenceInstant; + } + + /** + * @return duration before which the data is expired. + */ + public Duration getExpireBefore() + { + return expireBefore; + } + + /** + * Sets the duration which denotes expiry. Any event with time before this duration is considered to be expired. + * @param expireBefore duration + */ + public void setExpireBefore(Duration expireBefore) + { + this.expireBefore = expireBefore; + } + + /** + * @return time-bucket span + */ + public Duration getBucketSpan() + { + return bucketSpan; + } + + /** + * Sets the length of a time bucket. + * @param bucketSpan length of time bucket + */ + public void setBucketSpan(Duration bucketSpan) + { + this.bucketSpan = bucketSpan; + } + + /** + * The listener is informed when the time slides and time buckets which are older than the smallest time bucket + * (changed because of time slide) can be purged. + */ + public interface PurgeListener + { + void purgeTimeBucketsLessThanEqualTo(long timeBucket); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java new file mode 100644 index 0000000..99c8dd1 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ [email protected] +package org.apache.apex.malhar.lib.state.managed; + +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java b/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java new file mode 100644 index 0000000..e75d867 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ [email protected] +package org.apache.apex.malhar.lib.state; + +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java new file mode 100644 index 0000000..51e9a13 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeSet; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.netlet.util.Slice; + +public class BucketsFileSystemTest +{ + class TestMeta extends TestWatcher + { + BucketsFileSystem bucketsFileSystem; + String applicationPath; + MockManagedStateContext managedStateContext; + + @Override + protected void starting(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + + managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(7)); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data"); + managedStateContext.getFileAccess().init(); + + bucketsFileSystem = new BucketsFileSystem(); + + } + + @Override + protected void finished(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + } + + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testTransferBucket() throws IOException + { + testMeta.bucketsFileSystem.setup(testMeta.managedStateContext); + Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100); + testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0); + + ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 1); + testMeta.bucketsFileSystem.teardown(); + } + + @Test + public void testTransferOfExistingBucket() throws IOException + { + testMeta.bucketsFileSystem.setup(testMeta.managedStateContext); + Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100); + testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0); + + Map<Slice, Bucket.BucketedValue> more = ManagedStateTestUtils.getTestBucketData(50, 100); + testMeta.bucketsFileSystem.writeBucketData(10, 0, more); + + unsavedBucket0.putAll(more); + ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 2); + testMeta.bucketsFileSystem.teardown(); + } + + @Test + public void testUpdateBucketMetaDataFile() throws IOException + { + testMeta.bucketsFileSystem.setup(testMeta.managedStateContext); + BucketsFileSystem.MutableTimeBucketMeta mutableTbm = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1); + mutableTbm.updateTimeBucketMeta(10, 100, new Slice("1".getBytes())); + + testMeta.bucketsFileSystem.updateBucketMetaFile(1); + BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1); + Assert.assertNotNull(immutableTbm); + Assert.assertEquals("last transferred window", 10, immutableTbm.getLastTransferredWindowId()); + Assert.assertEquals("size in bytes", 100, immutableTbm.getSizeInBytes()); + Assert.assertEquals("first key", "1", immutableTbm.getFirstKey().stringValue()); + testMeta.bucketsFileSystem.teardown(); + } + + @Test + public void testGetTimeBucketMeta() throws IOException + { + testMeta.bucketsFileSystem.setup(testMeta.managedStateContext); + BucketsFileSystem.TimeBucketMeta bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1); + Assert.assertNull("bucket meta", bucketMeta); + + testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1); + bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1); + Assert.assertNotNull("bucket meta not null", bucketMeta); + testMeta.bucketsFileSystem.teardown(); + } + + @Test + public void testGetAllTimeBucketMeta() throws IOException + { + testMeta.bucketsFileSystem.setup(testMeta.managedStateContext); + BucketsFileSystem.MutableTimeBucketMeta tbm1 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1); + tbm1.updateTimeBucketMeta(10, 100, new Slice("1".getBytes())); + + BucketsFileSystem.MutableTimeBucketMeta tbm2 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 2); + tbm2.updateTimeBucketMeta(10, 100, new Slice("2".getBytes())); + + testMeta.bucketsFileSystem.updateBucketMetaFile(1); + TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas = + testMeta.bucketsFileSystem.getAllTimeBuckets(1); + + Iterator<BucketsFileSystem.TimeBucketMeta> iterator = timeBucketMetas.iterator(); + int i = 2; + while (iterator.hasNext()) { + BucketsFileSystem.TimeBucketMeta tbm = iterator.next(); + Assert.assertEquals("time bucket " + i, i, tbm.getTimeBucketId()); + i--; + } + testMeta.bucketsFileSystem.teardown(); + } + + @Test + public void testInvalidateTimeBucket() throws IOException + { + testMeta.bucketsFileSystem.setup(testMeta.managedStateContext); + testGetAllTimeBucketMeta(); + testMeta.bucketsFileSystem.invalidateTimeBucket(1, 1); + BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1,1); + Assert.assertNull("deleted tbm", immutableTbm); + + TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas = + testMeta.bucketsFileSystem.getAllTimeBuckets(1); + + Assert.assertEquals("only 1 tbm", 1, timeBucketMetas.size()); + immutableTbm = timeBucketMetas.iterator().next(); + + Assert.assertEquals("tbm 2", 2, immutableTbm.getTimeBucketId()); + testMeta.bucketsFileSystem.teardown(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java new file mode 100644 index 0000000..cb8a97f --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.lib.fileaccess.FileAccess; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.netlet.util.Slice; + +public class DefaultBucketTest +{ + + class TestMeta extends TestWatcher + { + Bucket.DefaultBucket defaultBucket; + String applicationPath; + MockManagedStateContext managedStateContext; + + @Override + protected void starting(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9)); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data"); + managedStateContext.getFileAccess().init(); + + defaultBucket = new Bucket.DefaultBucket(1); + managedStateContext.getBucketsFileSystem().setup(managedStateContext); + } + + @Override + protected void finished(Description description) + { + managedStateContext.getBucketsFileSystem().teardown(); + ManagedStateTestUtils.cleanTargetDir(description); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testPut() + { + testMeta.defaultBucket.setup(testMeta.managedStateContext); + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.defaultBucket.put(one, 1, one); + + Slice value = testMeta.defaultBucket.get(one, 1, Bucket.ReadSource.MEMORY); + Assert.assertEquals("value one", one, value); + + value = testMeta.defaultBucket.get(one, 1, Bucket.ReadSource.READERS); + Assert.assertNull("value not present", value); + + Assert.assertEquals("size of bucket", one.length * 2 + 64, testMeta.defaultBucket.getSizeInBytes()); + testMeta.defaultBucket.teardown(); + } + + @Test + public void testGetFromReader() throws IOException + { + testMeta.defaultBucket.setup(testMeta.managedStateContext); + Slice one = ManagedStateTestUtils.getSliceFor("1"); + + Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100); + testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0); + + ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1); + + Slice value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.READERS); + Assert.assertEquals("value one", one, value); + + testMeta.defaultBucket.teardown(); + } + + @Test + public void testGetFromSpecificTimeBucket() throws IOException + { + testMeta.defaultBucket.setup(testMeta.managedStateContext); + Slice one = ManagedStateTestUtils.getSliceFor("1"); + + Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100); + testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0); + + ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1); + + Slice value = testMeta.defaultBucket.get(one, 101, Bucket.ReadSource.READERS); + Assert.assertEquals("value one", one, value); + + testMeta.defaultBucket.teardown(); + } + + @Test + public void testCheckpointed() + { + testMeta.defaultBucket.setup(testMeta.managedStateContext); + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testPut(); + Map<Slice, Bucket.BucketedValue> unsaved = testMeta.defaultBucket.checkpoint(10); + Assert.assertEquals("size", 1, unsaved.size()); + + Map.Entry<Slice, Bucket.BucketedValue> entry = unsaved.entrySet().iterator().next(); + Assert.assertEquals("key", one, entry.getKey()); + Assert.assertEquals("value", one, entry.getValue().getValue()); + Assert.assertEquals("time bucket", 1, entry.getValue().getTimeBucket()); + testMeta.defaultBucket.teardown(); + } + + @Test + public void testCommitted() + { + testMeta.defaultBucket.setup(testMeta.managedStateContext); + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testCheckpointed(); + testMeta.defaultBucket.committed(10); + Slice value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.MEMORY); + Assert.assertEquals("value one", one, value); + testMeta.defaultBucket.teardown(); + } + + @Test + public void testCommittedWithOpenReader() throws IOException + { + testMeta.defaultBucket.setup(testMeta.managedStateContext); + testGetFromReader(); + Map<Long, FileAccess.FileReader> readers = testMeta.defaultBucket.getReaders(); + Assert.assertTrue("reader open", readers.containsKey(101L)); + + Slice two = ManagedStateTestUtils.getSliceFor("2"); + Slice one = ManagedStateTestUtils.getSliceFor("1"); + + testMeta.defaultBucket.put(two, 101, two); + Map<Slice, Bucket.BucketedValue> unsaved = testMeta.defaultBucket.checkpoint(10); + Assert.assertEquals("size", 1, unsaved.size()); + testMeta.defaultBucket.committed(10); + + Slice value = testMeta.defaultBucket.get(two, -1, Bucket.ReadSource.MEMORY); + Assert.assertEquals("value two", two, value); + + value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.MEMORY); + Assert.assertEquals("value one", one, value); + + Assert.assertTrue("reader closed", !readers.containsKey(101L)); + testMeta.defaultBucket.teardown(); + } + + @Test + public void testTeardown() throws IOException + { + testMeta.defaultBucket.setup(testMeta.managedStateContext); + testGetFromReader(); + Map<Long, FileAccess.FileReader> readers = testMeta.defaultBucket.getReaders(); + Assert.assertTrue("reader open", readers.containsKey(101L)); + + testMeta.defaultBucket.teardown(); + Assert.assertTrue("reader closed", readers.containsKey(101L)); + } + + @Test + public void testFreeMemory() throws IOException + { + testMeta.defaultBucket.setup(testMeta.managedStateContext); + testGetFromReader(); + long initSize = testMeta.defaultBucket.getSizeInBytes(); + + Slice two = ManagedStateTestUtils.getSliceFor("2"); + testMeta.defaultBucket.put(two, 101, two); + + Assert.assertEquals("size", initSize + (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes()); + + long sizeFreed = testMeta.defaultBucket.freeMemory(); + Assert.assertEquals("size freed", initSize, sizeFreed); + Assert.assertEquals("existing size", (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes()); + testMeta.defaultBucket.teardown(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java new file mode 100644 index 0000000..ca64693 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import javax.validation.constraints.NotNull; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.Slice; + +public class IncrementalCheckpointManagerTest +{ + class TestMeta extends TestWatcher + { + IncrementalCheckpointManager checkpointManager; + String applicationPath; + int operatorId = 1; + MockManagedStateContext managedStateContext; + + @Override + protected void starting(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + + Context.OperatorContext operatorContext = ManagedStateTestUtils.getOperatorContext(operatorId, applicationPath); + managedStateContext = new MockManagedStateContext(operatorContext); + + ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data"); + managedStateContext.getFileAccess().init(); + + checkpointManager = new IncrementalCheckpointManager(); + + managedStateContext.getTimeBucketAssigner().setup(managedStateContext); + managedStateContext.getBucketsFileSystem().setup(managedStateContext); + } + + @Override + protected void finished(Description description) + { + managedStateContext.getTimeBucketAssigner().teardown(); + managedStateContext.getBucketsFileSystem().teardown(); + ManagedStateTestUtils.cleanTargetDir(description); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testSerde() throws IOException + { + IncrementalCheckpointManager deserialized = KryoCloneUtils.cloneObject(testMeta.checkpointManager); + Assert.assertNotNull("state window data manager", deserialized); + } + + @Test + public void testSave() throws IOException + { + testMeta.checkpointManager.setup(testMeta.managedStateContext); + Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = ManagedStateTestUtils.getTestData(0, 5, 0); + testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false); + testMeta.checkpointManager.teardown(); + + testMeta.checkpointManager = new IncrementalCheckpointManager(); + testMeta.checkpointManager.setup(testMeta.managedStateContext); + @SuppressWarnings("unchecked") + Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5After = (Map<Long, Map<Slice, Bucket.BucketedValue>>) + testMeta.checkpointManager.load(testMeta.operatorId, 10); + + Assert.assertEquals("saved", buckets5, buckets5After); + testMeta.checkpointManager.teardown(); + } + + @Test + public void testTransferWindowFiles() throws IOException, InterruptedException + { + testMeta.checkpointManager.setup(testMeta.managedStateContext); + + Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = ManagedStateTestUtils.getTestData(0, 5, 0); + testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false); + //Need to synchronously call transfer window files so shutting down the other thread. + testMeta.checkpointManager.teardown(); + Thread.sleep(500); + + testMeta.checkpointManager.committed(testMeta.operatorId, 10); + testMeta.checkpointManager.transferWindowFiles(); + + for (int i = 0; i < 5; i++) { + ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i, + buckets5.get((long)i), 1); + } + } + + @Test + public void testCommitted() throws IOException, InterruptedException + { + CountDownLatch latch = new CountDownLatch(5); + MockBucketsFileSystem mockBucketsFileSystem = new MockBucketsFileSystem(latch); + + testMeta.managedStateContext.setBucketsFileSystem(mockBucketsFileSystem); + + mockBucketsFileSystem.setup(testMeta.managedStateContext); + testMeta.checkpointManager.setup(testMeta.managedStateContext); + + Map<Long, Map<Slice, Bucket.BucketedValue>> data = ManagedStateTestUtils.getTestData(0, 5, 0); + testMeta.checkpointManager.save(data, testMeta.operatorId, 10, false); + testMeta.checkpointManager.committed(testMeta.operatorId, 10); + latch.await(); + testMeta.checkpointManager.teardown(); + Thread.sleep(500); + + for (int i = 0; i < 5; i++) { + ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i, data.get((long)i), 1); + } + } + + @Test + public void testPurge() throws IOException, InterruptedException + { + FileSystem fileSystem = FileSystem.newInstance(new Configuration()); + + testTransferWindowFiles(); + RemoteIterator<LocatedFileStatus> iterator = fileSystem.listLocatedStatus( + new Path(testMeta.applicationPath + "/bucket_data")); + Assert.assertTrue(iterator.hasNext()); + + testMeta.managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(200); + + iterator = fileSystem.listLocatedStatus(new Path(testMeta.applicationPath + "/bucket_data")); + if (iterator.hasNext()) { + Assert.fail("All buckets should be deleted"); + } + } + + static class MockBucketsFileSystem extends BucketsFileSystem + { + private final transient CountDownLatch latch; + + public MockBucketsFileSystem(@NotNull CountDownLatch latch) + { + super(); + this.latch = Preconditions.checkNotNull(latch); + } + + @Override + protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data) + throws IOException + { + super.writeBucketData(windowId, bucketId, data); + if (windowId == 10) { + latch.countDown(); + } + } + } + + private static final transient Logger LOG = LoggerFactory.getLogger(IncrementalCheckpointManagerTest.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java new file mode 100644 index 0000000..f86b5d3 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.Slice; + +public class ManagedStateImplTest +{ + + class TestMeta extends TestWatcher + { + ManagedStateImpl managedState; + Context.OperatorContext operatorContext; + String applicationPath; + + @Override + protected void starting(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + managedState = new ManagedStateImpl(); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data"); + + operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath); + } + + @Override + protected void finished(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testSerde() throws IOException + { + ManagedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState); + Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets()); + } + + @Test + public void testSimplePutGet() + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.setup(testMeta.operatorContext); + testMeta.managedState.beginWindow(System.currentTimeMillis()); + testMeta.managedState.put(0, one, one); + Slice value = testMeta.managedState.getSync(0, one); + testMeta.managedState.endWindow(); + + Assert.assertEquals("value of one", one, value); + testMeta.managedState.teardown(); + } + + @Test + public void testAsyncGetFromFlash() throws ExecutionException, InterruptedException + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.setup(testMeta.operatorContext); + testMeta.managedState.beginWindow(System.currentTimeMillis()); + testMeta.managedState.put(0, one, one); + Future<Slice> valFuture = testMeta.managedState.getAsync(0, one); + Slice value = valFuture.get(); + + Assert.assertEquals("value of one", one, value); + testMeta.managedState.teardown(); + } + + @Test + public void testIncrementalCheckpoint() + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.setup(testMeta.operatorContext); + long time = System.currentTimeMillis(); + testMeta.managedState.beginWindow(time); + testMeta.managedState.put(0, one, one); + testMeta.managedState.endWindow(); + testMeta.managedState.beforeCheckpoint(time); + + Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0); + Assert.assertEquals("value of one", one, defaultBucket.getCheckpointedData().get(time).get(one).getValue()); + + Slice two = ManagedStateTestUtils.getSliceFor("2"); + testMeta.managedState.beginWindow(time + 1); + testMeta.managedState.put(0, two, two); + testMeta.managedState.endWindow(); + testMeta.managedState.beforeCheckpoint(time + 1); + + Assert.assertEquals("value of two", two, defaultBucket.getCheckpointedData().get(time + 1).get(two).getValue()); + testMeta.managedState.teardown(); + } + + @Test + public void testAsyncGetFromCheckpoint() throws ExecutionException, InterruptedException + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.setup(testMeta.operatorContext); + long time = System.currentTimeMillis(); + testMeta.managedState.beginWindow(time); + testMeta.managedState.put(0, one, one); + testMeta.managedState.endWindow(); + testMeta.managedState.beforeCheckpoint(time); + + Future<Slice> valFuture = testMeta.managedState.getAsync(0, one); + Assert.assertEquals("value of one", one, valFuture.get()); + testMeta.managedState.teardown(); + } + + @Test + public void testCommitted() + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + Slice two = ManagedStateTestUtils.getSliceFor("2"); + commitHelper(one, two); + Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0); + Assert.assertEquals("value of one", one, defaultBucket.getCommittedData().get(one).getValue()); + + Assert.assertNull("value of two", defaultBucket.getCommittedData().get(two)); + testMeta.managedState.teardown(); + } + + @Test + public void testAsyncGetFromCommitted() throws ExecutionException, InterruptedException + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + Slice two = ManagedStateTestUtils.getSliceFor("2"); + commitHelper(one, two); + Future<Slice> valFuture = testMeta.managedState.getAsync(0, one); + Assert.assertEquals("value of one", one, valFuture.get()); + } + + private void commitHelper(Slice one, Slice two) + { + testMeta.managedState.setup(testMeta.operatorContext); + long time = System.currentTimeMillis(); + testMeta.managedState.beginWindow(time); + testMeta.managedState.put(0, one, one); + testMeta.managedState.endWindow(); + testMeta.managedState.beforeCheckpoint(time); + + testMeta.managedState.beginWindow(time + 1); + testMeta.managedState.put(0, two, two); + testMeta.managedState.endWindow(); + testMeta.managedState.beforeCheckpoint(time + 1); + + testMeta.managedState.committed(time); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java new file mode 100644 index 0000000..f2251bd --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +import javax.annotation.Nullable; + +import org.junit.Assert; +import org.junit.runner.Description; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; + +import com.google.common.base.Function; +import com.google.common.collect.Maps; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.fileaccess.FileAccess; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.comparator.SliceComparator; +import com.datatorrent.netlet.util.Slice; + +public class ManagedStateTestUtils +{ + + static void cleanTargetDir(Description description) + { + try { + File out = new File("target/" + description.getClassName()); + if (out.exists()) { + FileUtils.deleteDirectory(out); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue> unsavedBucket, + int keysPerTimeBucket) throws IOException + { + RemoteIterator<LocatedFileStatus> iterator = fileAccess.listFiles(bucketId); + TreeMap<Slice, Slice> fromDisk = Maps.newTreeMap(new SliceComparator()); + int size = 0; + while (iterator.hasNext()) { + LocatedFileStatus fileStatus = iterator.next(); + + String timeBucketStr = fileStatus.getPath().getName(); + if (timeBucketStr.equals(BucketsFileSystem.META_FILE_NAME) || timeBucketStr.endsWith(".tmp")) { + //ignoring meta file + continue; + } + + LOG.debug("bucket {} time-bucket {}", bucketId, timeBucketStr); + + FileAccess.FileReader reader = fileAccess.getReader(bucketId, timeBucketStr); + + reader.readFully(fromDisk); + size += keysPerTimeBucket; + Assert.assertEquals("size of bucket " + bucketId, size, fromDisk.size()); + } + + Assert.assertEquals("size of bucket " + bucketId, unsavedBucket.size(), fromDisk.size()); + + Map<Slice, Slice> testBucket = Maps.transformValues(unsavedBucket, new Function<Bucket.BucketedValue, Slice>() + { + @Override + public Slice apply(@Nullable Bucket.BucketedValue input) + { + assert input != null; + return input.getValue(); + } + }); + Assert.assertEquals("data of bucket" + bucketId, testBucket, fromDisk); + } + + static Map<Long, Map<Slice, Bucket.BucketedValue>> getTestData(int startBucket, int endBucket, int keyStart) + { + Map<Long, Map<Slice, Bucket.BucketedValue>> data = Maps.newHashMap(); + for (int i = startBucket; i < endBucket; i++) { + Map<Slice, Bucket.BucketedValue> bucketData = getTestBucketData(keyStart, 100); + data.put((long)i, bucketData); + } + return data; + } + + static Map<Slice, Bucket.BucketedValue> getTestBucketData(int keyStart, long timeBucketStart) + { + Map<Slice, Bucket.BucketedValue> bucketData = Maps.newHashMap(); + for (int j = 0; j < 5; j++) { + Slice keyVal = new Slice(Integer.toString(keyStart).getBytes()); + bucketData.put(keyVal, new Bucket.BucketedValue(timeBucketStart + j, keyVal)); + keyStart++; + } + return bucketData; + } + + static Context.OperatorContext getOperatorContext(int operatorId, String applicationPath) + { + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes); + } + + static Context.OperatorContext getOperatorContext(int operatorId) + { + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes); + } + + private static final transient Logger LOG = LoggerFactory.getLogger(ManagedStateTestUtils.class); + + static Slice getSliceFor(String x) + { + return new Slice(x.getBytes()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java new file mode 100644 index 0000000..ac4db39 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.Slice; + +public class ManagedTimeStateImplTest +{ + class TestMeta extends TestWatcher + { + ManagedTimeStateImpl managedState; + Context.OperatorContext operatorContext; + String applicationPath; + + @Override + protected void starting(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + managedState = new ManagedTimeStateImpl(); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data"); + + operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath); + } + + @Override + protected void finished(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testSerde() throws IOException + { + ManagedTimeStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState); + Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets()); + } + + @Test + public void testAsyncGetFromReaders() throws IOException, ExecutionException, InterruptedException + { + Slice zero = ManagedStateTestUtils.getSliceFor("0"); + long time = System.currentTimeMillis(); + + testMeta.managedState.setup(testMeta.operatorContext); + + Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time); + testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0); + ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), 0, unsavedBucket0, 1); + + Future<Slice> valFuture = testMeta.managedState.getAsync(0, zero); + + Assert.assertEquals("value of zero", zero, valFuture.get()); + testMeta.managedState.teardown(); + } + + @Test + public void testPutGetWithTime() + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.setup(testMeta.operatorContext); + long time = System.currentTimeMillis(); + testMeta.managedState.beginWindow(0); + testMeta.managedState.put(0, time, one, one); + Slice value = testMeta.managedState.getSync(0, time, one); + testMeta.managedState.endWindow(); + + Assert.assertEquals("value of one", one, value); + testMeta.managedState.teardown(); + } + + @Test + public void testAsyncGetWithTime() throws ExecutionException, InterruptedException + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.setup(testMeta.operatorContext); + long time = System.currentTimeMillis(); + testMeta.managedState.beginWindow(0); + testMeta.managedState.put(0, time, one, one); + Future<Slice> valFuture = testMeta.managedState.getAsync(0, time, one); + Slice value = valFuture.get(); + + Assert.assertEquals("value of one", one, value); + testMeta.managedState.teardown(); + } + + @Test + public void testRecovery() throws ExecutionException, InterruptedException + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.setup(testMeta.operatorContext); + long time = System.currentTimeMillis(); + testMeta.managedState.beginWindow(0); + testMeta.managedState.put(0, time, one, one); + testMeta.managedState.endWindow(); + testMeta.managedState.beforeCheckpoint(0); + + testMeta.managedState.teardown(); + + //there is a failure and the operator is re-deployed. + testMeta.managedState.setStateTracker(new StateTracker()); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath); + attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 0L); + Context.OperatorContext operatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + + testMeta.managedState.setup(operatorContext); + + Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0); + Assert.assertEquals("value of one", one, defaultBucket.get(one, time, Bucket.ReadSource.MEMORY)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java new file mode 100644 index 0000000..523a10a --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.Slice; + +public class ManagedTimeUnifiedStateImplTest +{ + class TestMeta extends TestWatcher + { + ManagedTimeUnifiedStateImpl managedState; + Context.OperatorContext operatorContext; + String applicationPath; + + @Override + protected void starting(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + managedState = new ManagedTimeUnifiedStateImpl(); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data"); + + operatorContext = ManagedStateTestUtils.getOperatorContext(9, applicationPath); + } + + @Override + protected void finished(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testSerde() throws IOException + { + ManagedTimeUnifiedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState); + Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets()); + } + + @Test + public void testSimplePutGet() + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.setup(testMeta.operatorContext); + long time = System.currentTimeMillis(); + testMeta.managedState.beginWindow(0); + testMeta.managedState.put(time, one, one); + Slice value = testMeta.managedState.getSync(time, one); + testMeta.managedState.endWindow(); + + Assert.assertEquals("value of one", one, value); + testMeta.managedState.teardown(); + } + + @Test + public void testAsyncGet() throws ExecutionException, InterruptedException + { + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.setup(testMeta.operatorContext); + long time = System.currentTimeMillis(); + testMeta.managedState.beginWindow(0); + testMeta.managedState.put(time, one, one); + Future<Slice> valFuture = testMeta.managedState.getAsync(time, one); + Slice value = valFuture.get(); + + Assert.assertEquals("value of one", one, value); + testMeta.managedState.teardown(); + } + + @Test + public void testSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException + { + Slice zero = ManagedStateTestUtils.getSliceFor("0"); + long time = System.currentTimeMillis(); + + testMeta.managedState.setup(testMeta.operatorContext); + + long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time); + Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket); + + //write data to disk explicitly + testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0); + ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(), + unsavedBucket0, 1); + + Slice value = testMeta.managedState.getSync(time, zero); + + Assert.assertEquals("value of zero", zero, value); + testMeta.managedState.teardown(); + } + + @Test + public void testAsyncSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException + { + Slice zero = ManagedStateTestUtils.getSliceFor("0"); + long time = System.currentTimeMillis(); + + testMeta.managedState.setup(testMeta.operatorContext); + + long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time); + Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket); + + //write data to disk explicitly + testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0); + ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(), + unsavedBucket0, 1); + + Future<Slice> valFuture = testMeta.managedState.getAsync(time, zero); + + Assert.assertEquals("value of zero", zero, valFuture.get()); + testMeta.managedState.teardown(); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java new file mode 100644 index 0000000..8ae4db7 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.state.managed; + +import java.util.Comparator; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.fileaccess.FileAccess; +import com.datatorrent.lib.fileaccess.TFileImpl; +import com.datatorrent.lib.util.comparator.SliceComparator; +import com.datatorrent.netlet.util.Slice; + +class MockManagedStateContext implements ManagedStateContext +{ + private TFileImpl.DTFileImpl fileAccess = new TFileImpl.DTFileImpl(); + private Comparator<Slice> keyComparator = new SliceComparator(); + private BucketsFileSystem bucketsFileSystem = new BucketsFileSystem(); + private TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner(); + + private final Context.OperatorContext operatorContext; + + public MockManagedStateContext(Context.OperatorContext operatorContext) + { + this.operatorContext = operatorContext; + } + + @Override + public FileAccess getFileAccess() + { + return fileAccess; + } + + @Override + public Comparator<Slice> getKeyComparator() + { + return keyComparator; + } + + public BucketsFileSystem getBucketsFileSystem() + { + return bucketsFileSystem; + } + + @Override + public TimeBucketAssigner getTimeBucketAssigner() + { + return timeBucketAssigner; + } + + @Override + public Context.OperatorContext getOperatorContext() + { + return operatorContext; + } + + void setFileAccess(TFileImpl.DTFileImpl fileAccess) + { + this.fileAccess = fileAccess; + } + + void setKeyComparator(Comparator<Slice> keyComparator) + { + this.keyComparator = keyComparator; + } + + void setBucketsFileSystem(BucketsFileSystem bucketsFileSystem) + { + this.bucketsFileSystem = bucketsFileSystem; + } + + void setTimeBucketAssigner(TimeBucketAssigner timeBucketAssigner) + { + this.timeBucketAssigner = timeBucketAssigner; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java new file mode 100644 index 0000000..8a3e521 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.state.managed; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.fileaccess.FileAccessFSImpl; +import com.datatorrent.netlet.util.Slice; + +public class StateTrackerTest +{ + static class TestMeta extends TestWatcher + { + MockManagedStateImpl managedState; + Context.OperatorContext operatorContext; + String applicationPath; + + @Override + protected void starting(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + managedState = new MockManagedStateImpl(); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data"); + + managedState.setNumBuckets(2); + managedState.setMaxMemorySize(100); + + operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath); + } + + @Override + protected void finished(Description description) + { + ManagedStateTestUtils.cleanTargetDir(description); + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testEviction() throws InterruptedException + { + testMeta.managedState.latch = new CountDownLatch(1); + testMeta.managedState.setup(testMeta.operatorContext); + + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.beginWindow(System.currentTimeMillis()); + testMeta.managedState.put(1, one, one); + testMeta.managedState.endWindow(); + + testMeta.managedState.latch.await(); + testMeta.managedState.teardown(); + Assert.assertEquals("freed bucket", Lists.newArrayList(1L), testMeta.managedState.freedBuckets); + } + + @Test + public void testMultipleEvictions() throws InterruptedException + { + testMeta.managedState.latch = new CountDownLatch(2); + testMeta.managedState.setup(testMeta.operatorContext); + + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.beginWindow(System.currentTimeMillis()); + testMeta.managedState.put(1, one, one); + + Slice two = ManagedStateTestUtils.getSliceFor("2"); + testMeta.managedState.put(2, two, two); + testMeta.managedState.endWindow(); + + testMeta.managedState.latch.await(); + testMeta.managedState.teardown(); + Assert.assertEquals("freed bucket", Lists.newArrayList(1L, 2L), testMeta.managedState.freedBuckets); + } + + @Test + public void testBucketPrevention() throws InterruptedException + { + testMeta.managedState.setDurationPreventingFreeingSpace(Duration.standardDays(2)); + testMeta.managedState.setStateTracker(new MockStateTracker()); + testMeta.managedState.latch = new CountDownLatch(1); + + testMeta.managedState.setup(testMeta.operatorContext); + Slice one = ManagedStateTestUtils.getSliceFor("1"); + testMeta.managedState.beginWindow(System.currentTimeMillis()); + testMeta.managedState.put(1, one, one); + + Slice two = ManagedStateTestUtils.getSliceFor("2"); + testMeta.managedState.put(2, two, two); + testMeta.managedState.endWindow(); + + testMeta.managedState.latch.await(); + testMeta.managedState.teardown(); + Assert.assertEquals("no buckets triggered", 0, testMeta.managedState.freedBuckets.size()); + } + + private static class MockManagedStateImpl extends ManagedStateImpl + { + CountDownLatch latch; + List<Long> freedBuckets = Lists.newArrayList(); + + @Override + protected Bucket newBucket(long bucketId) + { + return new MockDefaultBucket(bucketId); + } + } + + private static class MockDefaultBucket extends Bucket.DefaultBucket + { + + protected MockDefaultBucket(long bucketId) + { + super(bucketId); + } + + @Override + public long freeMemory() throws IOException + { + long freedBytes = super.freeMemory(); + ((MockManagedStateImpl)managedStateContext).freedBuckets.add(getBucketId()); + ((MockManagedStateImpl)managedStateContext).latch.countDown(); + return freedBytes; + } + + @Override + public long getSizeInBytes() + { + return 600; + } + } + + private static class MockStateTracker extends StateTracker + { + + @Override + public void run() + { + super.run(); + ((MockManagedStateImpl)managedStateImpl).latch.countDown(); + } + } + +}
