Repository: apex-malhar
Updated Branches:
  refs/heads/master 0a87bc0a5 -> 7b019fa1b


APEXMALHAR-2136 1) Fixed the null pointer exception issue. 2) Added getter and 
setter for IncrementalCheckpointManager


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d25b9185
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d25b9185
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d25b9185

Branch: refs/heads/master
Commit: d25b9185b793a2f3745b774b700f0684a1bbcdda
Parents: c4a1129
Author: Chaitanya <[email protected]>
Authored: Thu Jul 7 22:38:38 2016 +0530
Committer: Chaitanya <[email protected]>
Committed: Thu Jul 7 22:38:38 2016 +0530

----------------------------------------------------------------------
 .../state/managed/AbstractManagedStateImpl.java | 42 +++++++++++++-------
 .../malhar/lib/state/managed/StateTracker.java  |  2 +-
 2 files changed, 28 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d25b9185/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 196ea69..b5b9f8c 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -144,7 +144,7 @@ public abstract class AbstractManagedStateImpl
   protected transient ExecutorService readerService;
 
   @NotNull
-  protected IncrementalCheckpointManager checkpointManager = new 
IncrementalCheckpointManager();
+  private IncrementalCheckpointManager checkpointManager = new 
IncrementalCheckpointManager();
 
   @NotNull
   protected BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
@@ -203,22 +203,24 @@ public abstract class AbstractManagedStateImpl
       //delete all the wal files with windows > activationWindow.
       //All the wal files with windows <= activationWindow are loaded and kept 
separately as recovered data.
       try {
-        for (long recoveredWindow : 
checkpointManager.getWindowIds(operatorContext.getId())) {
-          if (recoveredWindow <= activationWindow) {
-            @SuppressWarnings("unchecked")
-            Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = 
(Map<Long, Map<Slice, Bucket.BucketedValue>>)
-                checkpointManager.load(operatorContext.getId(), 
recoveredWindow);
-            if (recoveredData != null && !recoveredData.isEmpty()) {
-              for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : 
recoveredData.entrySet()) {
-                int bucketIdx = prepareBucket(entry.getKey());
-                buckets[bucketIdx].recoveredData(recoveredWindow, 
entry.getValue());
+        long[] recoveredWindows = 
checkpointManager.getWindowIds(operatorContext.getId());
+        if (recoveredWindows != null) {
+          for (long recoveredWindow : recoveredWindows) {
+            if (recoveredWindow <= activationWindow) {
+              @SuppressWarnings("unchecked")
+              Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = 
(Map<Long, Map<Slice, Bucket.BucketedValue>>)
+                  checkpointManager.load(operatorContext.getId(), 
recoveredWindow);
+              if (recoveredData != null && !recoveredData.isEmpty()) {
+                for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : 
recoveredData.entrySet()) {
+                  int bucketIdx = prepareBucket(entry.getKey());
+                  buckets[bucketIdx].recoveredData(recoveredWindow, 
entry.getValue());
+                }
               }
+              checkpointManager.save(recoveredData, operatorContext.getId(), 
recoveredWindow,
+                  true /*skipWritingToWindowFile*/);
+            } else {
+              checkpointManager.delete(operatorContext.getId(), 
recoveredWindow);
             }
-            checkpointManager.save(recoveredData, operatorContext.getId(), 
recoveredWindow,
-                true /*skipWritingToWindowFile*/);
-
-          } else {
-            checkpointManager.delete(operatorContext.getId(), recoveredWindow);
           }
         }
       } catch (IOException e) {
@@ -536,6 +538,16 @@ public abstract class AbstractManagedStateImpl
     this.durationPreventingFreeingSpace = durationPreventingFreeingSpace;
   }
 
+  public IncrementalCheckpointManager getCheckpointManager()
+  {
+    return checkpointManager;
+  }
+
+  public void setCheckpointManager(@NotNull IncrementalCheckpointManager 
checkpointManager)
+  {
+    this.checkpointManager = Preconditions.checkNotNull(checkpointManager);
+  }
+
   static class ValueFetchTask implements Callable<Slice>
   {
     private final Bucket bucket;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d25b9185/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
index 4813c25..5678107 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -122,7 +122,7 @@ class StateTracker extends TimerTask
             synchronized (bucket) {
               long sizeFreed;
               try {
-                sizeFreed = 
bucket.freeMemory(managedStateImpl.checkpointManager.getLastTransferredWindow());
+                sizeFreed = 
bucket.freeMemory(managedStateImpl.getCheckpointManager().getLastTransferredWindow());
                 LOG.debug("bucket freed {} {}", bucketId, sizeFreed);
               } catch (IOException e) {
                 managedStateImpl.throwable.set(e);

Reply via email to