Repository: apex-malhar Updated Branches: refs/heads/master a75b093df -> a25b6140c
APEXMALHAR-2335 APEXMALHAR-2333 APEXMALHAR-2334 #resolve #comment Problems on StateTracker Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/a25b6140 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/a25b6140 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/a25b6140 Branch: refs/heads/master Commit: a25b6140cac9653b10657061a2b59d7404ecf56a Parents: a75b093 Author: brightchen <[email protected]> Authored: Thu Nov 10 14:59:52 2016 -0800 Committer: brightchen <[email protected]> Committed: Mon Nov 14 16:22:40 2016 -0800 ---------------------------------------------------------------------- .../state/ManagedStateBenchmarkApp.java | 2 + .../benchmark/state/StoreOperator.java | 19 ++- .../state/ManagedStateBenchmarkAppTest.java | 2 +- .../state/managed/AbstractManagedStateImpl.java | 4 +- .../apex/malhar/lib/state/managed/Bucket.java | 53 +++++- .../malhar/lib/state/managed/StateTracker.java | 160 +++++++------------ .../serde/DefaultBlockReleaseStrategy.java | 12 +- .../lib/utils/serde/WindowedBlockStream.java | 24 +++ .../lib/state/managed/DefaultBucketTest.java | 10 +- .../lib/state/managed/StateTrackerTest.java | 15 +- 10 files changed, 159 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java index eab02db..be615d0 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java @@ -23,6 +23,7 @@ import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Random; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,6 +79,7 @@ public class ManagedStateBenchmarkApp implements StreamingApplication String basePath = getStoreBasePath(conf); ManagedTimeUnifiedStateImpl store = new ManagedTimeUnifiedStateImpl(); ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath); + store.getTimeBucketAssigner().setBucketSpan(Duration.millis(10000)); return store; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java index ad92b60..f960d15 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java @@ -56,11 +56,12 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo //this is the store we are going to use private ManagedTimeUnifiedStateImpl store; - private long lastCheckPointWindowId = -1; - private long currentWindowId; private long tupleCount = 0; private int windowCountPerStatistics = 0; private long statisticsBeginTime = 0; + private long applicationBeginTime = 0; + private long totalTupleCount = 0; + private ExecMode execMode = ExecMode.INSERT; private int timeRange = 1000 * 60; @@ -89,11 +90,13 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo @Override public void beginWindow(long windowId) { - currentWindowId = windowId; store.beginWindow(windowId); if (statisticsBeginTime <= 0) { statisticsBeginTime = System.currentTimeMillis(); } + if (applicationBeginTime <= 0) { + applicationBeginTime = System.currentTimeMillis(); + } } @Override @@ -226,7 +229,7 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo public void beforeCheckpoint(long windowId) { store.beforeCheckpoint(windowId); - logger.info("beforeCheckpoint {}", windowId); + logger.debug("beforeCheckpoint {}", windowId); } public ManagedTimeUnifiedStateImpl getStore() @@ -241,8 +244,12 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo private void logStatistics() { - long spentTime = System.currentTimeMillis() - statisticsBeginTime; - logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime); + final long now = System.currentTimeMillis(); + long spentTime = now - statisticsBeginTime; + long totalSpentTime = now - applicationBeginTime; + totalTupleCount += tupleCount; + logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime, + totalTupleCount * 1000 / totalSpentTime); statisticsBeginTime = System.currentTimeMillis(); tupleCount = 0; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java index 5279d36..4f03a10 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java @@ -86,7 +86,7 @@ public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp // Create local cluster final LocalMode.Controller lc = lma.getController(); - lc.run(300000); + lc.run(3000000); lc.shutdown(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java index 1c52c31..364bc19 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java @@ -46,7 +46,6 @@ import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.Futures; import com.datatorrent.api.Component; -import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.Stateless; @@ -158,8 +157,7 @@ public abstract class AbstractManagedStateImpl @NotNull @FieldSerializer.Bind(JavaSerializer.class) - private Duration checkStateSizeInterval = Duration.millis( - DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue * OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue); + private Duration checkStateSizeInterval = Duration.millis(60000); @FieldSerializer.Bind(JavaSerializer.class) private Duration durationPreventingFreeingSpace; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java index cbc4e03..3c18b2f 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java @@ -435,17 +435,51 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide * Free memory up to the given windowId * This method will be called by another thread. Adding concurrency control to Stream would impact the performance. * This method only calculates the size of the memory that could be released and then sends free memory request to the operator thread + * + * We intend to manage memory by keyStream and valueStream. But the we can't avoid caller use other mechanism to manage memory. + * It is required to cleanup maps in this case. */ @Override public long freeMemory(long windowId) throws IOException { - // calculate the size first and then send the release memory request. It could reduce the chance of conflict and increase the performance. - long size = keyStream.dataSizeUpToWindow(windowId) + valueStream.dataSizeUpToWindow(windowId); + long memoryFreed = 0; + Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> entryIter = committedData.entrySet().iterator(); + while (entryIter.hasNext()) { + Map.Entry<Long, Map<Slice, BucketedValue>> bucketEntry = entryIter.next(); + if (bucketEntry.getKey() > windowId) { + break; + } + + Map<Slice, BucketedValue> windowData = bucketEntry.getValue(); + entryIter.remove(); + + for (Map.Entry<Slice, BucketedValue> entry : windowData.entrySet()) { + memoryFreed += entry.getKey().length + entry.getValue().getSize(); + } + } + + fileCache.clear(); + if (cachedBucketMetas != null) { + + for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas.values()) { + FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId()); + if (reader != null) { + memoryFreed += tbm.getSizeInBytes(); + reader.close(); + } + } + } + + sizeInBytes.getAndAdd(-memoryFreed); + + //add the windowId to the queue to let operator thread release memory from keyStream and valueStream windowsForFreeMemory.add(windowId); - return size; + + return memoryFreed; } /** + * Release the memory managed by keyStream and valueStream. * This operation must be called from operator thread. It won't do anything if no memory to be freed */ protected long releaseMemory() @@ -459,10 +493,10 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide memoryFreed += originSize - (keyStream.size() + valueStream.size()); } - if (memoryFreed > 0) { - LOG.debug("Total freed memory size: {}", memoryFreed); - sizeInBytes.getAndAdd(-memoryFreed); - } + //release the free memory immediately + keyStream.releaseAllFreeMemory(); + valueStream.releaseAllFreeMemory(); + return memoryFreed; } @@ -482,6 +516,7 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide @Override public void committed(long committedWindowId) { + releaseMemory(); Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> stateIterator = checkpointedData.entrySet().iterator(); while (stateIterator.hasNext()) { @@ -518,7 +553,9 @@ public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvide } } sizeInBytes.getAndAdd(-memoryFreed); - committedData.put(savedWindow, bucketData); + if (!bucketData.isEmpty()) { + committedData.put(savedWindow, bucketData); + } stateIterator.remove(); } else { break; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java index 5678107..225439f 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java @@ -19,11 +19,12 @@ package org.apache.apex.malhar.lib.state.managed; import java.io.IOException; -import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; import javax.validation.constraints.NotNull; @@ -31,57 +32,51 @@ import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.lang3.mutable.MutableLong; + import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; /** * Tracks the size of state in memory and evicts buckets. */ class StateTracker extends TimerTask { - //bucket id -> bucket id & time wrapper - private final transient ConcurrentHashMap<Long, BucketIdTimeWrapper> bucketAccessTimes = new ConcurrentHashMap<>(); - - private transient ConcurrentSkipListSet<BucketIdTimeWrapper> bucketHeap; - private final transient Timer memoryFreeService = new Timer(); protected transient AbstractManagedStateImpl managedStateImpl; + private transient long lastUpdateAccessTime = 0; + private final transient Set<Long> accessedBucketIds = Sets.newHashSet(); + private final transient LinkedHashMap<Long, MutableLong> bucketLastAccess = new LinkedHashMap<>(16, 0.75f, true); + + private int updateAccessTimeInterval = 500; + void setup(@NotNull AbstractManagedStateImpl managedStateImpl) { this.managedStateImpl = Preconditions.checkNotNull(managedStateImpl, "managed state impl"); - this.bucketHeap = new ConcurrentSkipListSet<>( - new Comparator<BucketIdTimeWrapper>() - { - //Note: this comparator imposes orderings that are inconsistent with equals. - @Override - public int compare(BucketIdTimeWrapper o1, BucketIdTimeWrapper o2) - { - if (o1.getLastAccessedTime() < o2.getLastAccessedTime()) { - return -1; - } - if (o1.getLastAccessedTime() > o2.getLastAccessedTime()) { - return 1; - } - - return Long.compare(o1.bucketId, o2.bucketId); - } - }); long intervalMillis = managedStateImpl.getCheckStateSizeInterval().getMillis(); memoryFreeService.scheduleAtFixedRate(this, intervalMillis, intervalMillis); } void bucketAccessed(long bucketId) { - BucketIdTimeWrapper idTimeWrapper = bucketAccessTimes.get(bucketId); - if (idTimeWrapper != null) { - bucketHeap.remove(idTimeWrapper); - } else { - idTimeWrapper = new BucketIdTimeWrapper(bucketId); + long now = System.currentTimeMillis(); + if (accessedBucketIds.add(bucketId) || now - lastUpdateAccessTime > updateAccessTimeInterval) { + synchronized (bucketLastAccess) { + for (long id : accessedBucketIds) { + MutableLong lastAccessTime = bucketLastAccess.get(id); + if (lastAccessTime != null) { + lastAccessTime.setValue(now); + } else { + bucketLastAccess.put(id, new MutableLong(now)); + } + } + } + accessedBucketIds.clear(); + lastUpdateAccessTime = now; } - idTimeWrapper.setLastAccessedTime(System.currentTimeMillis()); - bucketHeap.add(idTimeWrapper); } @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @@ -105,88 +100,51 @@ class StateTracker extends TimerTask durationMillis = duration.getMillis(); } - BucketIdTimeWrapper idTimeWrapper; - while (bytesSum > managedStateImpl.getMaxMemorySize() && bucketHeap.size() > 0 && - null != (idTimeWrapper = bucketHeap.first())) { - //trigger buckets to free space - - if (System.currentTimeMillis() - idTimeWrapper.getLastAccessedTime() < durationMillis) { - //if the least recently used bucket cannot free up space because it was accessed within the - //specified duration then subsequent buckets cannot free space as well because this heap is ordered by time. - break; - } - long bucketId = idTimeWrapper.bucketId; - Bucket bucket = managedStateImpl.getBucket(bucketId); - if (bucket != null) { - - synchronized (bucket) { - long sizeFreed; - try { - sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow()); - LOG.debug("bucket freed {} {}", bucketId, sizeFreed); - } catch (IOException e) { - managedStateImpl.throwable.set(e); - throw new RuntimeException("freeing " + bucketId, e); + synchronized (bucketLastAccess) { + long now = System.currentTimeMillis(); + for (Iterator<Map.Entry<Long, MutableLong>> iterator = bucketLastAccess.entrySet().iterator(); + bytesSum > managedStateImpl.getMaxMemorySize() && iterator.hasNext(); ) { + Map.Entry<Long, MutableLong> entry = iterator.next(); + if (now - entry.getValue().longValue() < durationMillis) { + break; + } + long bucketId = entry.getKey(); + Bucket bucket = managedStateImpl.getBucket(bucketId); + if (bucket != null) { + synchronized (bucket) { + long sizeFreed; + try { + sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow()); + LOG.debug("bucket freed {} {}", bucketId, sizeFreed); + } catch (IOException e) { + managedStateImpl.throwable.set(e); + throw new RuntimeException("freeing " + bucketId, e); + } + bytesSum -= sizeFreed; + } + if (bucket.getSizeInBytes() == 0) { + iterator.remove(); } - bytesSum -= sizeFreed; } - bucketHeap.remove(idTimeWrapper); - bucketAccessTimes.remove(bucketId); } } } } } - void teardown() + public int getUpdateAccessTimeInterval() { - memoryFreeService.cancel(); + return updateAccessTimeInterval; } - /** - * Wrapper class for bucket id and the last time the bucket was accessed. - */ - private static class BucketIdTimeWrapper + public void setUpdateAccessTimeInterval(int updateAccessTimeInterval) { - private final long bucketId; - private long lastAccessedTime; - - BucketIdTimeWrapper(long bucketId) - { - this.bucketId = bucketId; - } - - private synchronized long getLastAccessedTime() - { - return lastAccessedTime; - } - - private synchronized void setLastAccessedTime(long lastAccessedTime) - { - this.lastAccessedTime = lastAccessedTime; - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (!(o instanceof BucketIdTimeWrapper)) { - return false; - } - - BucketIdTimeWrapper that = (BucketIdTimeWrapper)o; - //Note: the comparator used with bucket heap imposes orderings that are inconsistent with equals - return bucketId == that.bucketId; - - } + this.updateAccessTimeInterval = updateAccessTimeInterval; + } - @Override - public int hashCode() - { - return (int)(bucketId ^ (bucketId >>> 32)); - } + void teardown() + { + memoryFreeService.cancel(); } private static final Logger LOG = LoggerFactory.getLogger(StateTracker.class); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java index 93929e4..365cbc3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java @@ -18,9 +18,8 @@ */ package org.apache.apex.malhar.lib.utils.serde; -import java.util.Arrays; - import org.apache.commons.collections.buffer.CircularFifoBuffer; +import org.apache.commons.lang3.mutable.MutableInt; /** * This implementation get the minimum number of free blocks in the period to release. @@ -30,7 +29,6 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy { public static final int DEFAULT_PERIOD = 60; // 60 reports private CircularFifoBuffer freeBlockNumQueue; - private Integer[] tmpArray; public DefaultBlockReleaseStrategy() { @@ -40,8 +38,6 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy public DefaultBlockReleaseStrategy(int period) { freeBlockNumQueue = new CircularFifoBuffer(period); - tmpArray = new Integer[period]; - Arrays.fill(tmpArray, 0); } /** @@ -54,7 +50,7 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy if (freeBlockNum < 0) { throw new IllegalArgumentException("The number of free blocks could not less than zero."); } - freeBlockNumQueue.add(freeBlockNum); + freeBlockNumQueue.add(new MutableInt(freeBlockNum)); } /** @@ -66,7 +62,7 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy { int minNum = Integer.MAX_VALUE; for (Object num : freeBlockNumQueue) { - minNum = Math.min((Integer)num, minNum); + minNum = Math.min(((MutableInt)num).intValue(), minNum); } return minNum; } @@ -89,7 +85,7 @@ public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy * decrease by released blocks */ for (Object num : freeBlockNumQueue) { - freeBlockNumQueue.add(Math.max((Integer)num - numReleasedBlocks, 0)); + ((MutableInt)num).setValue(Math.max(((MutableInt)num).intValue() - numReleasedBlocks, 0)); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java index fa4cd73..53710f8 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java @@ -246,4 +246,28 @@ public class WindowedBlockStream extends BlockStream implements WindowListener, } } + /** + * This method releases all free memory immediately. + * This method will not be controlled by release strategy + */ + public void releaseAllFreeMemory() + { + int releasedBlocks = 0; + + Iterator<Integer> iter = freeBlockIds.iterator(); + while (iter.hasNext()) { + //release blocks + int blockId = iter.next(); + iter.remove(); + blocks.remove(blockId); + releasedBlocks++; + } + + /** + * report number of released blocks + */ + if (releasedBlocks > 0) { + releaseStrategy.releasedBlocks(releasedBlocks); + } + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java index 6645a98..f7e24de 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java @@ -32,7 +32,6 @@ import org.apache.apex.malhar.lib.state.managed.Bucket.ReadSource; import org.apache.apex.malhar.lib.utils.serde.AffixSerde; import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer; import org.apache.apex.malhar.lib.utils.serde.StringSerde; -import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream; import com.google.common.primitives.Longs; @@ -223,13 +222,8 @@ public class DefaultBucketTest testMeta.defaultBucket.get(keySlice, -1, ReadSource.MEMORY); long sizeFreed = currentSize - testMeta.defaultBucket.getSizeInBytes(); - SerializationBuffer tmpBuffer = new SerializationBuffer(new WindowedBlockStream()); - tmpBuffer.writeBytes(keyPrefix); - tmpBuffer.writeString(key); - tmpBuffer.writeString(value); - int expectedFreedSize = tmpBuffer.toSlice().toByteArray().length; //key prefix, key length, key; value length, value - Assert.assertEquals("size freed", expectedFreedSize, sizeFreed); - Assert.assertEquals("existing size", currentSize - expectedFreedSize, testMeta.defaultBucket.getSizeInBytes()); + Assert.assertEquals("size freed", initSize, sizeFreed); + Assert.assertEquals("existing size", currentSize - initSize, testMeta.defaultBucket.getSizeInBytes()); testMeta.defaultBucket.teardown(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/a25b6140/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java index 07e141a..ca78186 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java @@ -20,7 +20,7 @@ package org.apache.apex.malhar.lib.state.managed; import java.io.IOException; -import java.util.List; +import java.util.Set; import java.util.concurrent.CountDownLatch; import org.joda.time.Duration; @@ -30,7 +30,7 @@ import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; -import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.datatorrent.api.Context; import com.datatorrent.lib.fileaccess.FileAccessFSImpl; @@ -82,7 +82,7 @@ public class StateTrackerTest testMeta.managedState.latch.await(); testMeta.managedState.teardown(); - Assert.assertEquals("freed bucket", Lists.newArrayList(1L), testMeta.managedState.freedBuckets); + Assert.assertEquals("freed bucket", Sets.newHashSet(1L), testMeta.managedState.freedBuckets); } @Test @@ -101,7 +101,7 @@ public class StateTrackerTest testMeta.managedState.latch.await(); testMeta.managedState.teardown(); - Assert.assertEquals("freed bucket", Lists.newArrayList(1L, 2L), testMeta.managedState.freedBuckets); + Assert.assertEquals("freed bucket", Sets.newHashSet(1L, 2L), testMeta.managedState.freedBuckets); } @Test @@ -128,7 +128,7 @@ public class StateTrackerTest private static class MockManagedStateImpl extends ManagedStateImpl { CountDownLatch latch; - List<Long> freedBuckets = Lists.newArrayList(); + Set<Long> freedBuckets = Sets.newHashSet(); @Override protected Bucket newBucket(long bucketId) @@ -149,8 +149,9 @@ public class StateTrackerTest public long freeMemory(long windowId) throws IOException { long freedBytes = super.freeMemory(windowId); - ((MockManagedStateImpl)managedStateContext).freedBuckets.add(getBucketId()); - ((MockManagedStateImpl)managedStateContext).latch.countDown(); + if (((MockManagedStateImpl)managedStateContext).freedBuckets.add(getBucketId())) { + ((MockManagedStateImpl)managedStateContext).latch.countDown(); + } return freedBytes; }
