Repository: apex-malhar Updated Branches: refs/heads/master 0a87bc0a5 -> 7b019fa1b
APEXMALHAR-2136 1) Fixed the null pointer exception issue. 2) Added getter and setter for IncrementalCheckpointManager Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d25b9185 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d25b9185 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d25b9185 Branch: refs/heads/master Commit: d25b9185b793a2f3745b774b700f0684a1bbcdda Parents: c4a1129 Author: Chaitanya <[email protected]> Authored: Thu Jul 7 22:38:38 2016 +0530 Committer: Chaitanya <[email protected]> Committed: Thu Jul 7 22:38:38 2016 +0530 ---------------------------------------------------------------------- .../state/managed/AbstractManagedStateImpl.java | 42 +++++++++++++------- .../malhar/lib/state/managed/StateTracker.java | 2 +- 2 files changed, 28 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d25b9185/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java index 196ea69..b5b9f8c 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java @@ -144,7 +144,7 @@ public abstract class AbstractManagedStateImpl protected transient ExecutorService readerService; @NotNull - protected IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager(); + private IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager(); @NotNull protected BucketsFileSystem bucketsFileSystem = new BucketsFileSystem(); @@ -203,22 +203,24 @@ public abstract class AbstractManagedStateImpl //delete all the wal files with windows > activationWindow. //All the wal files with windows <= activationWindow are loaded and kept separately as recovered data. try { - for (long recoveredWindow : checkpointManager.getWindowIds(operatorContext.getId())) { - if (recoveredWindow <= activationWindow) { - @SuppressWarnings("unchecked") - Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = (Map<Long, Map<Slice, Bucket.BucketedValue>>) - checkpointManager.load(operatorContext.getId(), recoveredWindow); - if (recoveredData != null && !recoveredData.isEmpty()) { - for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : recoveredData.entrySet()) { - int bucketIdx = prepareBucket(entry.getKey()); - buckets[bucketIdx].recoveredData(recoveredWindow, entry.getValue()); + long[] recoveredWindows = checkpointManager.getWindowIds(operatorContext.getId()); + if (recoveredWindows != null) { + for (long recoveredWindow : recoveredWindows) { + if (recoveredWindow <= activationWindow) { + @SuppressWarnings("unchecked") + Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = (Map<Long, Map<Slice, Bucket.BucketedValue>>) + checkpointManager.load(operatorContext.getId(), recoveredWindow); + if (recoveredData != null && !recoveredData.isEmpty()) { + for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : recoveredData.entrySet()) { + int bucketIdx = prepareBucket(entry.getKey()); + buckets[bucketIdx].recoveredData(recoveredWindow, entry.getValue()); + } } + checkpointManager.save(recoveredData, operatorContext.getId(), recoveredWindow, + true /*skipWritingToWindowFile*/); + } else { + checkpointManager.delete(operatorContext.getId(), recoveredWindow); } - checkpointManager.save(recoveredData, operatorContext.getId(), recoveredWindow, - true /*skipWritingToWindowFile*/); - - } else { - checkpointManager.delete(operatorContext.getId(), recoveredWindow); } } } catch (IOException e) { @@ -536,6 +538,16 @@ public abstract class AbstractManagedStateImpl this.durationPreventingFreeingSpace = durationPreventingFreeingSpace; } + public IncrementalCheckpointManager getCheckpointManager() + { + return checkpointManager; + } + + public void setCheckpointManager(@NotNull IncrementalCheckpointManager checkpointManager) + { + this.checkpointManager = Preconditions.checkNotNull(checkpointManager); + } + static class ValueFetchTask implements Callable<Slice> { private final Bucket bucket; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d25b9185/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java index 4813c25..5678107 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java @@ -122,7 +122,7 @@ class StateTracker extends TimerTask synchronized (bucket) { long sizeFreed; try { - sizeFreed = bucket.freeMemory(managedStateImpl.checkpointManager.getLastTransferredWindow()); + sizeFreed = bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow()); LOG.debug("bucket freed {} {}", bucketId, sizeFreed); } catch (IOException e) { managedStateImpl.throwable.set(e);
