APEXMALHAR-2063 Made window data manager use file system wal

Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/1b4536ca
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/1b4536ca
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/1b4536ca

Branch: refs/heads/master
Commit: 1b4536ca0224eae2452d3a052d6f67b67bdec893
Parents: 7d9386d
Author: Chandni Singh <[email protected]>
Authored: Fri Aug 26 20:16:17 2016 -0700
Committer: Chandni Singh <[email protected]>
Committed: Fri Aug 26 21:10:48 2016 -0700

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       |  66 +-
 .../kinesis/AbstractKinesisInputOperator.java   | 102 ++-
 .../contrib/nifi/AbstractNiFiInputOperator.java |   8 +-
 .../nifi/AbstractNiFiOutputOperator.java        |   4 +-
 .../rabbitmq/AbstractRabbitMQInputOperator.java |   8 +-
 .../AbstractRabbitMQOutputOperator.java         |   4 +-
 .../redis/AbstractRedisInputOperator.java       |  12 +-
 .../contrib/kafka/KafkaInputOperatorTest.java   |   4 +-
 .../kinesis/KinesisInputOperatorTest.java       |   2 +-
 .../nifi/NiFiSinglePortInputOperatorTest.java   |   4 +-
 .../rabbitmq/RabbitMQInputOperatorTest.java     |   4 +-
 .../contrib/redis/RedisInputOperatorTest.java   |   4 +-
 .../kafka/AbstractKafkaInputOperator.java       |   9 +-
 ...afkaSinglePortExactlyOnceOutputOperator.java |  16 +-
 .../malhar/kafka/KafkaOutputOperatorTest.java   |   2 +-
 .../db/jdbc/AbstractJdbcPollInputOperator.java  |  16 +-
 .../lib/io/fs/AbstractFileInputOperator.java    |  19 +-
 .../com/datatorrent/lib/io/fs/FileSplitter.java |  16 +-
 .../lib/io/fs/FileSplitterInput.java            |  33 +-
 .../lib/io/jms/AbstractJMSInputOperator.java    |  60 +-
 .../AbstractManagedStateInnerJoinOperator.java  |   4 +-
 .../state/managed/AbstractManagedStateImpl.java |  37 +-
 .../managed/IncrementalCheckpointManager.java   |  51 +-
 .../malhar/lib/wal/FSWindowDataManager.java     | 723 ++++++++++++++-----
 .../apex/malhar/lib/wal/FSWindowReplayWAL.java  | 188 +++++
 .../apex/malhar/lib/wal/FileSystemWAL.java      | 292 +++++---
 .../org/apache/apex/malhar/lib/wal/WAL.java     |   9 +-
 .../apex/malhar/lib/wal/WindowDataManager.java  | 103 ++-
 .../db/jdbc/JdbcPojoPollableOpeartorTest.java   |   4 +-
 .../io/fs/AbstractFileInputOperatorTest.java    |  32 +-
 .../lib/io/fs/FileSplitterInputTest.java        | 201 ++++--
 .../datatorrent/lib/io/fs/FileSplitterTest.java |   2 +-
 .../lib/io/jms/JMSStringInputOperatorTest.java  |  37 +-
 .../lib/io/jms/SQSStringInputOperatorTest.java  |   4 +-
 .../IncrementalCheckpointManagerTest.java       |  19 +-
 .../malhar/lib/wal/FSWindowDataManagerTest.java | 259 ++++---
 .../apex/malhar/lib/wal/FileSystemWALTest.java  |   4 +-
 37 files changed, 1581 insertions(+), 781 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 21ff181..abf3fad 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -273,7 +273,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
-    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
       replay(windowId);
     }
     emitCount = 0;
@@ -285,7 +285,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
     try {
       @SuppressWarnings("unchecked")
       Map<KafkaPartition, MutablePair<Long, Integer>> recoveredData = 
(Map<KafkaPartition, MutablePair<Long, Integer>>)
-          windowDataManager.load(operatorId, windowId);
+          windowDataManager.retrieve(windowId);
       if (recoveredData != null) {
         Map<String, List<PartitionMetadata>> pms = 
KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, 
getConsumer().topic);
         if (pms != null) {
@@ -325,7 +325,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
           }
         }
       }
-      if(windowId == windowDataManager.getLargestRecoveryWindow()) {
+      if(windowId == windowDataManager.getLargestCompletedWindow()) {
         // Start the consumer at the largest recovery window
         SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer();
         // Set the offset positions to the consumer
@@ -351,9 +351,9 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
       Map<KafkaPartition, Long> carryOn = new HashMap<>(offsetStats);
       offsetTrackHistory.add(Pair.of(currentWindowId, carryOn));
     }
-    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
       try {
-        windowDataManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
+        windowDataManager.save(currentWindowRecoveryState, currentWindowId);
       }
       catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
@@ -396,7 +396,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
     }
 
     try {
-      windowDataManager.deleteUpTo(operatorId, windowId);
+      windowDataManager.committed(windowId);
     }
     catch (IOException e) {
       throw new RuntimeException("deleting state", e);
@@ -407,7 +407,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   public void activate(OperatorContext ctx)
   {
     if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != 
Stateless.WINDOW_ID &&
-        context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestRecoveryWindow()) {
+        context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestCompletedWindow()) {
       // If it is a replay state, don't start the consumer
       return;
     }
@@ -426,7 +426,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
       return;
     }
     int count = consumer.messageSize() + ((pendingMessage != null) ? 1 : 0);
@@ -521,9 +521,10 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
       logger.info("Initial offsets: {} ", "{ " + Joiner.on(", 
").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }");
     }
 
-    Collection<WindowDataManager> newManagers = Sets.newHashSet();
-    Set<Integer> deletedOperators =  Sets.newHashSet();
-
+    Set<Integer> deletedOperators = Sets.newHashSet();
+    Collection<Partition<AbstractKafkaInputOperator<K>>> resultPartitions = 
partitions;
+    boolean numPartitionsChanged = false;
+    
     switch (strategy) {
 
     // For the 1 to 1 mapping The framework will create number of operator 
partitions based on kafka topic partitions
@@ -545,19 +546,21 @@ public abstract class AbstractKafkaInputOperator<K 
extends KafkaConsumer> implem
           String clusterId = kp.getKey();
           for (PartitionMetadata pm : kp.getValue()) {
             logger.info("[ONE_TO_ONE]: Create operator partition for cluster 
{}, topic {}, kafka partition {} ", clusterId, getConsumer().topic, 
pm.partitionId());
-            newPartitions.add(createPartition(Sets.newHashSet(new 
KafkaPartition(clusterId, consumer.topic, pm.partitionId())), initOffset, 
newManagers));
+            newPartitions.add(createPartition(Sets.newHashSet(new 
KafkaPartition(clusterId, consumer.topic, pm.partitionId())), initOffset));
           }
         }
-        windowDataManager.partitioned(newManagers, deletedOperators);
-        return newPartitions;
+        resultPartitions = newPartitions;
+        numPartitionsChanged = true;
       }
       else if (newWaitingPartition.size() != 0) {
         // add partition for new kafka partition
         for (KafkaPartition newPartition : newWaitingPartition) {
           logger.info("[ONE_TO_ONE]: Add operator partition for cluster {}, 
topic {}, partition {}", newPartition.getClusterId(), getConsumer().topic, 
newPartition.getPartitionId());
-          partitions.add(createPartition(Sets.newHashSet(newPartition), null, 
newManagers));
+          partitions.add(createPartition(Sets.newHashSet(newPartition), null));
         }
         newWaitingPartition.clear();
+        resultPartitions = partitions;
+        numPartitionsChanged = true;
       }
       break;
     // For the 1 to N mapping The initial partition number is defined by 
stream application
@@ -595,7 +598,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
         newPartitions = new 
ArrayList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>(size);
         for (i = 0; i < size; i++) {
           logger.info("[ONE_TO_MANY]: Create operator partition for kafka 
partition(s): {} ", StringUtils.join(kps[i], ", "));
-          newPartitions.add(createPartition(kps[i], initOffset, newManagers));
+          newPartitions.add(createPartition(kps[i], initOffset));
         }
         // Add the existing partition Ids to the deleted operators
         for (Partition<AbstractKafkaInputOperator<K>> op : partitions)
@@ -604,8 +607,8 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
         }
 
         newWaitingPartition.clear();
-        windowDataManager.partitioned(newManagers, deletedOperators);
-        return newPartitions;
+        resultPartitions = newPartitions;
+        numPartitionsChanged = true;
       }
       break;
 
@@ -614,13 +617,33 @@ public abstract class AbstractKafkaInputOperator<K 
extends KafkaConsumer> implem
     default:
       break;
     }
+  
+    if (numPartitionsChanged) {
+      List<WindowDataManager> managers = 
windowDataManager.partition(resultPartitions.size(), deletedOperators);
+      int i = 0;
+      for (Partition<AbstractKafkaInputOperator<K>> partition : partitions) {
+        
partition.getPartitionedInstance().setWindowDataManager(managers.get(i++));
+      }
+    }
+    return resultPartitions;
+  }
 
-    windowDataManager.partitioned(newManagers, deletedOperators);
-    return partitions;
+  /**
+   * Create a new partition with the partition Ids and initial offset positions
+   *
+   * @deprecated use {@link #createPartition(Set, Map)}
+   */
+  @Deprecated
+  protected Partitioner.Partition<AbstractKafkaInputOperator<K>> 
createPartition(Set<KafkaPartition> pIds,
+      Map<KafkaPartition, Long> initOffsets,
+      @SuppressWarnings("UnusedParameters") Collection<WindowDataManager> 
newManagers)
+  {
+    return createPartition(pIds, initOffsets);
   }
 
   // Create a new partition with the partition Ids and initial offset positions
-  protected Partitioner.Partition<AbstractKafkaInputOperator<K>> 
createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> 
initOffsets, Collection<WindowDataManager> newManagers)
+  protected Partitioner.Partition<AbstractKafkaInputOperator<K>> 
createPartition(Set<KafkaPartition> pIds,
+      Map<KafkaPartition, Long> initOffsets)
   {
 
     Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new 
DefaultPartition<>(KryoCloneUtils.cloneObject(this));
@@ -632,7 +655,6 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
         
p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets());
       }
     }
-    newManagers.add(p.getPartitionedInstance().windowDataManager);
 
     PartitionInfo pif = new PartitionInfo();
     pif.kpids = pIds;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index 2352236..c03df21 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -18,32 +18,46 @@
  */
 package com.datatorrent.contrib.kinesis;
 
-import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.services.kinesis.model.Shard;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.*;
-import com.datatorrent.api.Operator.ActivationListener;
-import com.datatorrent.common.util.Pair;
-import com.datatorrent.lib.util.KryoCloneUtils;
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
-import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.collect.Sets;
+import javax.validation.Valid;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import javax.validation.Valid;
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.Sets;
 
-import java.io.IOException;
-import java.lang.reflect.Array;
-import java.util.*;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.common.util.Pair;
+import com.datatorrent.lib.util.KryoCloneUtils;
 
 @DefaultSerializer(JavaSerializer.class)
 class KinesisPair <F, S> extends Pair<F, S>
@@ -168,7 +182,6 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
 
     // Operator partitions
     List<Partition<AbstractKinesisInputOperator>> newPartitions = null;
-    Collection<WindowDataManager> newManagers = Sets.newHashSet();
     Set<Integer> deletedOperators =  Sets.newHashSet();
 
     // initialize the shard positions
@@ -188,7 +201,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
         newPartitions = new 
ArrayList<Partition<AbstractKinesisInputOperator>>(shards.size());
         for (int i = 0; i < shards.size(); i++) {
           logger.info("[ONE_TO_ONE]: Create operator partition for kinesis 
partition: " + shards.get(i).getShardId() + ", StreamName: " + 
this.getConsumer().streamName);
-          
newPartitions.add(createPartition(Sets.newHashSet(shards.get(i).getShardId()), 
initShardPos, newManagers));
+          
newPartitions.add(createPartition(Sets.newHashSet(shards.get(i).getShardId()), 
initShardPos));
         }
       } else if (newWaitingPartition.size() != 0) {
         // Remove the partitions for the closed shards
@@ -196,10 +209,15 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
         // add partition for new kinesis shard
         for (String pid : newWaitingPartition) {
           logger.info("[ONE_TO_ONE]: Add operator partition for kinesis 
partition " + pid);
-          partitions.add(createPartition(Sets.newHashSet(pid), null, 
newManagers));
+          partitions.add(createPartition(Sets.newHashSet(pid), null));
         }
         newWaitingPartition.clear();
-        windowDataManager.partitioned(newManagers, deletedOperators);
+        List<WindowDataManager> managers = 
windowDataManager.partition(partitions.size(), deletedOperators);
+        int i = 0;
+        for (Partition<AbstractKinesisInputOperator> partition : partitions) {
+          
partition.getPartitionedInstance().setWindowDataManager(managers.get(i));
+          i++;
+        }
         return partitions;
       }
       break;
@@ -211,11 +229,10 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
          2. Static Partition: Number of DT partitions is fixed, whether the 
number of shards are increased/decreased.
       */
       int size = initialPartitionCount;
-      if(newWaitingPartition.size() != 0)
-      {
+      if (newWaitingPartition.size() != 0) {
         // Get the list of open shards
         shards = getOpenShards(partitions);
-        if(shardsPerPartition > 1)
+        if (shardsPerPartition > 1)
           size = (int)Math.ceil(shards.size() / (shardsPerPartition * 1.0));
         initShardPos = shardManager.loadInitialShardPositions();
       }
@@ -238,20 +255,26 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
         newWaitingPartition.clear();
       }
       // Add the existing partition Ids to the deleted operators
-      for(Partition<AbstractKinesisInputOperator> op : partitions)
-      {
+      for (Partition<AbstractKinesisInputOperator> op : partitions) {
         deletedOperators.add(op.getPartitionedInstance().operatorId);
       }
+
       for (int i = 0; i < pIds.length; i++) {
         logger.info("[MANY_TO_ONE]: Create operator partition for kinesis 
partition(s): " + StringUtils.join(pIds[i], ", ") + ", StreamName: " + 
this.getConsumer().streamName);
-        if(pIds[i] != null)
-          newPartitions.add(createPartition(pIds[i], initShardPos, 
newManagers));
+
+        if (pIds[i] != null) {
+          newPartitions.add(createPartition(pIds[i], initShardPos));
+        }
       }
       break;
     default:
       break;
     }
-    windowDataManager.partitioned(newManagers, deletedOperators);
+    int i = 0;
+    List<WindowDataManager> managers = 
windowDataManager.partition(partitions.size(), deletedOperators);
+    for (Partition<AbstractKinesisInputOperator> partition : partitions) {
+      
partition.getPartitionedInstance().setWindowDataManager(managers.get(i++));
+    }
     return newPartitions;
   }
 
@@ -371,10 +394,9 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   }
   // Create a new partition with the shardIds and initial shard positions
   private
-  Partition<AbstractKinesisInputOperator> createPartition(Set<String> 
shardIds, Map<String, String> initShardPos, Collection<WindowDataManager> 
newManagers)
+  Partition<AbstractKinesisInputOperator> createPartition(Set<String> 
shardIds, Map<String, String> initShardPos)
   {
     Partition<AbstractKinesisInputOperator> p = new 
DefaultPartition<AbstractKinesisInputOperator>(KryoCloneUtils.cloneObject(this));
-    newManagers.add(p.getPartitionedInstance().windowDataManager);
     p.getPartitionedInstance().getConsumer().setShardIds(shardIds);
     p.getPartitionedInstance().getConsumer().resetShardPositions(initShardPos);
 
@@ -403,7 +425,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
     operatorId = context.getId();
     windowDataManager.setup(context);
     shardPosition.clear();
-    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestRecoveryWindow()) {
+    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestCompletedWindow()) {
       isReplayState = true;
     }
   }
@@ -426,7 +448,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   {
     emitCount = 0;
     currentWindowId = windowId;
-    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
       replay(windowId);
     }
   }
@@ -436,7 +458,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
     try {
       @SuppressWarnings("unchecked")
       Map<String, KinesisPair<String, Integer>> recoveredData =
-          (Map<String, KinesisPair<String, 
Integer>>)windowDataManager.load(operatorId, windowId);
+          (Map<String, KinesisPair<String, 
Integer>>)windowDataManager.retrieve(windowId);
       if (recoveredData == null) {
         return;
       }
@@ -466,10 +488,10 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   @Override
   public void endWindow()
   {
-    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
       context.setCounters(getConsumer().getConsumerStats(shardPosition));
       try {
-        windowDataManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
+        windowDataManager.save(currentWindowRecoveryState, currentWindowId);
       }
       catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
@@ -497,7 +519,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   public void committed(long windowId)
   {
     try {
-      windowDataManager.deleteUpTo(operatorId, windowId);
+      windowDataManager.committed(windowId);
     }
     catch (IOException e) {
       throw new RuntimeException("deleting state", e);
@@ -523,7 +545,7 @@ public abstract class AbstractKinesisInputOperator <T> 
implements InputOperator,
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
       return;
     }
     int count = consumer.getQueueSize();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
index 1bc81d6..b040d87 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.java
@@ -105,9 +105,9 @@ public abstract class AbstractNiFiInputOperator<T> 
implements InputOperator
     currentWindowId = windowId;
 
     // if the current window is now less than the largest window, then we need 
to replay data
-    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
       try {
-        List<T> recoveredData =  
(List<T>)this.windowDataManager.load(operatorContextId, windowId);
+        List<T> recoveredData =  
(List<T>)this.windowDataManager.retrieve(windowId);
         if (recoveredData == null) {
           return;
         }
@@ -159,7 +159,7 @@ public abstract class AbstractNiFiInputOperator<T> 
implements InputOperator
 
       // ensure we have the data saved before proceeding in case anything goes 
wrong
       currentWindowTuples.addAll(tuples);
-      windowDataManager.save(currentWindowTuples, operatorContextId, 
currentWindowId);
+      windowDataManager.save(currentWindowTuples, currentWindowId);
 
       // we now have the data saved so we can complete the transaction
       transaction.complete();
@@ -192,7 +192,7 @@ public abstract class AbstractNiFiInputOperator<T> 
implements InputOperator
   {
     // save the final state of the window and clear the current window list
     try {
-      windowDataManager.save(currentWindowTuples, operatorContextId, 
currentWindowId);
+      windowDataManager.save(currentWindowTuples, currentWindowId);
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
index d0dc23f..53cf302 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/nifi/AbstractNiFiOutputOperator.java
@@ -104,7 +104,7 @@ public abstract class AbstractNiFiOutputOperator<T> extends 
BaseOperator
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
-    largestRecoveryWindowId = windowDataManager.getLargestRecoveryWindow();
+    largestRecoveryWindowId = windowDataManager.getLargestCompletedWindow();
 
     // if processing a window we've already seen, don't resend the tuples
     if (currentWindowId <= largestRecoveryWindowId) {
@@ -127,7 +127,7 @@ public abstract class AbstractNiFiOutputOperator<T> extends 
BaseOperator
 
     // mark that we processed the window
     try {
-      windowDataManager.save("processedWindow", operatorContextId, 
currentWindowId);
+      windowDataManager.save("processedWindow", currentWindowId);
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index b842698..847602e 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -190,7 +190,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
-    if (windowId <= this.windowDataManager.getLargestRecoveryWindow()) {
+    if (windowId <= this.windowDataManager.getLargestCompletedWindow()) {
       replay(windowId);
     }
   }
@@ -199,7 +199,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   private void replay(long windowId) {      
     Map<Long, byte[]> recoveredData;
     try {
-      recoveredData = (Map<Long, byte[]>) 
this.windowDataManager.load(operatorContextId, windowId);
+      recoveredData = (Map<Long, 
byte[]>)this.windowDataManager.retrieve(windowId);
       if (recoveredData == null) {
         return;
       }
@@ -225,7 +225,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
     }
     
     try {
-      this.windowDataManager.save(currentWindowRecoveryState, 
operatorContextId, currentWindowId);
+      this.windowDataManager.save(currentWindowRecoveryState, currentWindowId);
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }
@@ -320,7 +320,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   public void committed(long windowId)
   {
     try {
-      windowDataManager.deleteUpTo(operatorContextId, windowId);
+      windowDataManager.committed(windowId);
     }
     catch (IOException e) {
       throw new RuntimeException("committing", e);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
index 6043c5b..a19417c 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
@@ -109,7 +109,7 @@ public class AbstractRabbitMQOutputOperator extends 
BaseOperator
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;    
-    largestRecoveryWindowId = windowDataManager.getLargestRecoveryWindow();
+    largestRecoveryWindowId = windowDataManager.getLargestCompletedWindow();
     if (windowId <= largestRecoveryWindowId) {
       // Do not resend already sent tuples
       skipProcessingTuple = true;
@@ -132,7 +132,7 @@ public class AbstractRabbitMQOutputOperator extends 
BaseOperator
       return;
     }
     try {
-      windowDataManager.save("processedWindow", operatorContextId, 
currentWindowId);
+      windowDataManager.save("processedWindow", currentWindowId);
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
index fd0a885..59b320d 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java
@@ -94,7 +94,7 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractKeyValueStor
     currentWindowId = windowId;
     scanCallsInCurrentWindow = 0;
     replay = false;
-    if (currentWindowId <= getWindowDataManager().getLargestRecoveryWindow()) {
+    if (currentWindowId <= getWindowDataManager().getLargestCompletedWindow()) 
{
       replay(windowId);
     }
   }
@@ -107,11 +107,11 @@ public abstract class AbstractRedisInputOperator<T> 
extends AbstractKeyValueStor
       if (!skipOffsetRecovery) {
         // Begin offset for this window is recovery offset stored for the last
         // window
-        RecoveryState recoveryStateForLastWindow = (RecoveryState) 
getWindowDataManager().load(context.getId(), windowId - 1);
+        RecoveryState recoveryStateForLastWindow = (RecoveryState) 
getWindowDataManager().retrieve(windowId - 1);
         recoveryState.scanOffsetAtBeginWindow = 
recoveryStateForLastWindow.scanOffsetAtBeginWindow;
       }
       skipOffsetRecovery = false;
-      RecoveryState recoveryStateForCurrentWindow = (RecoveryState) 
getWindowDataManager().load(context.getId(), windowId);
+      RecoveryState recoveryStateForCurrentWindow = (RecoveryState) 
getWindowDataManager().retrieve(windowId);
       recoveryState.numberOfScanCallsInWindow = 
recoveryStateForCurrentWindow.numberOfScanCallsInWindow;
       if (recoveryState.scanOffsetAtBeginWindow != null) {
         scanOffset = recoveryState.scanOffsetAtBeginWindow;
@@ -183,9 +183,9 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractKeyValueStor
     recoveryState.scanOffsetAtBeginWindow = scanOffset;
     recoveryState.numberOfScanCallsInWindow = scanCallsInCurrentWindow;
 
-    if (currentWindowId > getWindowDataManager().getLargestRecoveryWindow()) {
+    if (currentWindowId > getWindowDataManager().getLargestCompletedWindow()) {
       try {
-        getWindowDataManager().save(recoveryState, context.getId(), 
currentWindowId);
+        getWindowDataManager().save(recoveryState, currentWindowId);
       } catch (IOException e) {
         DTThrowable.rethrow(e);
       }
@@ -233,7 +233,7 @@ public abstract class AbstractRedisInputOperator<T> extends 
AbstractKeyValueStor
   public void committed(long windowId)
   {
     try {
-      getWindowDataManager().deleteUpTo(context.getId(), windowId);
+      getWindowDataManager().committed(windowId);
     } catch (IOException e) {
       throw new RuntimeException("committing", e);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index e4a4dec..2b4b0f5 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -305,7 +305,7 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     operator.deactivate();
 
     operator = createAndDeployOperator(true);
-    Assert.assertEquals("largest recovery window", 2, 
operator.getWindowDataManager().getLargestRecoveryWindow());
+    Assert.assertEquals("largest recovery window", 2, 
operator.getWindowDataManager().getLargestCompletedWindow());
 
     operator.beginWindow(1);
     operator.emitTuples();
@@ -390,7 +390,7 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
 
     if (isIdempotency) {
       FSWindowDataManager storageManager = new FSWindowDataManager();
-      storageManager.setRecoveryPath(testMeta.recoveryDir);
+      storageManager.setStatePath(testMeta.recoveryDir);
       testMeta.operator.setWindowDataManager(storageManager);
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
index e8eff5d..a79d03a 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java
@@ -232,7 +232,7 @@ public class KinesisInputOperatorTest extends 
KinesisOperatorTestBase
     testMeta.operator.setup(testMeta.context);
     testMeta.operator.activate(testMeta.context);
 
-    Assert.assertEquals("largest recovery window", 1, 
testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
+    Assert.assertEquals("largest recovery window", 1, 
testMeta.operator.getWindowDataManager().getLargestCompletedWindow());
 
     testMeta.operator.beginWindow(1);
     testMeta.operator.endWindow();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
index dc56e1c..a1a26ab 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java
@@ -34,6 +34,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.nifi.remote.protocol.DataPacket;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.util.file.FileUtils;
@@ -45,7 +46,6 @@ import com.datatorrent.contrib.nifi.mock.MockDataPacket;
 import com.datatorrent.contrib.nifi.mock.MockSiteToSiteClient;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
-import org.apache.apex.malhar.lib.wal.WindowDataManager;
 
 public class NiFiSinglePortInputOperatorTest
 {
@@ -115,7 +115,7 @@ public class NiFiSinglePortInputOperatorTest
     windowDataManager.setup(context);
 
     // verify that all the data packets were saved for window #1
-    List<StandardNiFiDataPacket> windowData = (List<StandardNiFiDataPacket>) 
windowDataManager.load(context.getId(), 1);
+    List<StandardNiFiDataPacket> windowData = 
(List<StandardNiFiDataPacket>)windowDataManager.retrieve(1);
     Assert.assertNotNull("Should have recovered data", windowData);
     Assert.assertEquals("Size of recovered data should equal size of mock data 
packets",
         dataPackets.size(), windowData.size());

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
index 4fccffa..ebe4a90 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
@@ -221,7 +221,7 @@ public class RabbitMQInputOperatorTest
     operator.setup(context);
     operator.activate(context);
 
-    Assert.assertEquals("largest recovery window", 1, 
operator.getWindowDataManager().getLargestRecoveryWindow());
+    Assert.assertEquals("largest recovery window", 1, 
operator.getWindowDataManager().getLargestCompletedWindow());
     operator.beginWindow(1);
     operator.endWindow();
     Assert.assertEquals("num of messages in window 1", 15, 
sink.collectedTuples.size());
@@ -229,7 +229,7 @@ public class RabbitMQInputOperatorTest
 
     operator.deactivate();
     operator.teardown();
-    operator.getWindowDataManager().deleteUpTo(context.getId(), 1);
+    operator.getWindowDataManager().committed(1);
     publisher.teardown();
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
index bee170d..010c534 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java
@@ -169,7 +169,7 @@ public class RedisInputOperatorTest
       operator.outputPort.setSink(sink);
       operator.setup(context);
 
-      Assert.assertEquals("largest recovery window", 2, 
operator.getWindowDataManager().getLargestRecoveryWindow());
+      Assert.assertEquals("largest recovery window", 2, 
operator.getWindowDataManager().getLargestCompletedWindow());
 
       operator.beginWindow(1);
       operator.emitTuples();
@@ -189,7 +189,7 @@ public class RedisInputOperatorTest
         testStore.remove(entry.getKey());
       }
       sink.collectedTuples.clear();
-      operator.getWindowDataManager().deleteUpTo(context.getId(), 5);
+      operator.getWindowDataManager().committed(5);
       operator.teardown();
     }
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index b05d877..4cf2888 100644
--- 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -223,7 +223,7 @@ public abstract class AbstractKafkaInputOperator implements 
InputOperator, Opera
     }
     if (isIdempotent()) {
       try {
-        windowDataManager.deleteUpTo(operatorId, windowId);
+        windowDataManager.committed(windowId);
       } catch (IOException e) {
         DTThrowable.rethrow(e);
       }
@@ -259,7 +259,7 @@ public abstract class AbstractKafkaInputOperator implements 
InputOperator, Opera
     emitCount = 0;
     currentWindowId = wid;
     windowStartOffset.clear();
-    if (isIdempotent() && wid <= windowDataManager.getLargestRecoveryWindow()) 
{
+    if (isIdempotent() && wid <= 
windowDataManager.getLargestCompletedWindow()) {
       replay(wid);
     } else {
       consumerWrapper.afterReplay();
@@ -269,8 +269,9 @@ public abstract class AbstractKafkaInputOperator implements 
InputOperator, Opera
   private void replay(long windowId)
   {
     try {
+      @SuppressWarnings("unchecked")
       Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData 
=
-          (Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, 
Long>>)windowDataManager.load(operatorId, windowId);
+          (Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, 
Long>>)windowDataManager.retrieve(windowId);
       consumerWrapper.emitImmediately(windowData);
     } catch (IOException e) {
       DTThrowable.rethrow(e);
@@ -294,7 +295,7 @@ public abstract class AbstractKafkaInputOperator implements 
InputOperator, Opera
         for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : 
windowStartOffset.entrySet()) {
           windowData.put(e.getKey(), new MutablePair<>(e.getValue(), 
offsetTrack.get(e.getKey()) - e.getValue()));
         }
-        windowDataManager.save(windowData, operatorId, currentWindowId);
+        windowDataManager.save(windowData, currentWindowId);
       } catch (IOException e) {
         DTThrowable.rethrow(e);
       }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index 09ae1cb..29b2584 100644
--- 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -133,7 +133,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> 
extends AbstractKafkaOu
   {
     this.windowId = windowId;
 
-    if (windowId == windowDataManager.getLargestRecoveryWindow()) {
+    if (windowId == windowDataManager.getLargestCompletedWindow()) {
       rebuildPartialWindow();
     }
   }
@@ -147,7 +147,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> 
extends AbstractKafkaOu
   public void committed(long windowId)
   {
     try {
-      windowDataManager.deleteUpTo(operatorId, windowId);
+      windowDataManager.committed(windowId);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -168,7 +168,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> 
extends AbstractKafkaOu
   @Override
   public void endWindow()
   {
-    if (!partialWindowTuples.isEmpty() && windowId > 
windowDataManager.getLargestRecoveryWindow()) {
+    if (!partialWindowTuples.isEmpty() && windowId > 
windowDataManager.getLargestCompletedWindow()) {
       throw new RuntimeException("Violates Exactly once. Not all the tuples 
received after operator reset.");
     }
 
@@ -176,7 +176,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> 
extends AbstractKafkaOu
     getProducer().flush();
 
     try {
-      this.windowDataManager.save(getPartitionsAndOffsets(true), operatorId, 
windowId);
+      this.windowDataManager.save(getPartitionsAndOffsets(true), windowId);
     } catch (IOException | InterruptedException | ExecutionException e) {
       throw new RuntimeException(e);
     }
@@ -209,7 +209,7 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> 
extends AbstractKafkaOu
 
   private boolean alreadyInKafka(T message)
   {
-    if ( windowId <= windowDataManager.getLargestRecoveryWindow() ) {
+    if ( windowId <= windowDataManager.getLargestCompletedWindow() ) {
       return true;
     }
 
@@ -265,20 +265,20 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> 
extends AbstractKafkaOu
 
   private void rebuildPartialWindow()
   {
-    logger.info("Rebuild the partial window after " + 
windowDataManager.getLargestRecoveryWindow());
+    logger.info("Rebuild the partial window after " + 
windowDataManager.getLargestCompletedWindow());
 
     Map<Integer,Long> storedOffsets;
     Map<Integer,Long> currentOffsets;
 
     try {
-      storedOffsets = 
(Map<Integer,Long>)this.windowDataManager.load(operatorId, windowId);
+      storedOffsets = 
(Map<Integer,Long>)this.windowDataManager.retrieve(windowId);
       currentOffsets = getPartitionsAndOffsets(true);
     } catch (IOException | ExecutionException | InterruptedException e) {
       throw new RuntimeException(e);
     }
 
     if (currentOffsets == null) {
-      logger.debug("No tuples found while building partial window " + 
windowDataManager.getLargestRecoveryWindow());
+      logger.debug("No tuples found while building partial window " + 
windowDataManager.getLargestCompletedWindow());
       return;
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
index db27be0..5d0e59a 100644
--- 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
+++ 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
@@ -242,7 +242,7 @@ public class KafkaOutputOperatorTest extends 
KafkaOperatorTestBase
     WindowDataManager windowDataManager = new FSWindowDataManager();
     windowDataManager.setup(operatorContext);
     try {
-      
windowDataManager.deleteUpTo(operatorContext.getId(),windowDataManager.getLargestRecoveryWindow());
+      
windowDataManager.committed(windowDataManager.getLargestCompletedWindow());
     } catch (IOException e) {
       e.printStackTrace();
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
index 234e28c..edb3aaa 100644
--- 
a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java
@@ -161,7 +161,7 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
   public void activate(OperatorContext context)
   {
     initializePreparedStatement();
-    long largestRecoveryWindow = windowManager.getLargestRecoveryWindow();
+    long largestRecoveryWindow = windowManager.getLargestCompletedWindow();
     if (largestRecoveryWindow == Stateless.WINDOW_ID
         || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > 
largestRecoveryWindow) {
       scanService.scheduleAtFixedRate(new DBPoller(), 0, pollInterval, 
TimeUnit.MILLISECONDS);
@@ -191,7 +191,7 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
-    if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowManager.getLargestCompletedWindow()) {
       try {
         replay(currentWindowId);
         return;
@@ -228,7 +228,7 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= windowManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowManager.getLargestCompletedWindow()) {
       return;
     }
     int pollSize = (emitQueue.size() < batchSize) ? emitQueue.size() : 
batchSize;
@@ -247,9 +247,9 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
   public void endWindow()
   {
     try {
-      if (currentWindowId > windowManager.getLargestRecoveryWindow()) {
+      if (currentWindowId > windowManager.getLargestCompletedWindow()) {
         currentWindowRecoveryState = new MutablePair<>(lowerBound, 
lastEmittedRow);
-        windowManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
+        windowManager.save(currentWindowRecoveryState, currentWindowId);
       }
     } catch (IOException e) {
       throw new RuntimeException("saving recovery", e);
@@ -301,8 +301,8 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
   {
 
     try {
-      MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, 
Integer>)windowManager.load(operatorId,
-          windowId);
+      @SuppressWarnings("unchecked")
+      MutablePair<Integer, Integer> recoveredData = (MutablePair<Integer, 
Integer>)windowManager.retrieve(windowId);
 
       if (recoveredData != null && shouldReplayWindow(recoveredData)) {
         LOG.debug("[Recovering Window ID - {} for record range: {}, {}]", 
windowId, recoveredData.left,
@@ -317,7 +317,7 @@ public abstract class AbstractJdbcPollInputOperator<T> 
extends AbstractStoreInpu
 
       }
 
-      if (currentWindowId == windowManager.getLargestRecoveryWindow()) {
+      if (currentWindowId == windowManager.getLargestCompletedWindow()) {
         try {
           if (!isPollerPartition && rangeQueryPair.getValue() != null) {
             ps = store.getConnection().prepareStatement(

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index a3de5f3..f0e3fbb 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -457,7 +457,7 @@ public abstract class AbstractFileInputOperator<T>
     fileCounters.setCounter(FileCounters.PENDING_FILES, pendingFileCount);
 
     windowDataManager.setup(context);
-    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestRecoveryWindow()) {
+    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestCompletedWindow()) {
       //reset current file and offset in case of replay
       currentFile = null;
       offset = 0;
@@ -519,7 +519,7 @@ public abstract class AbstractFileInputOperator<T>
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
-    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
       replay(windowId);
     }
   }
@@ -527,9 +527,9 @@ public abstract class AbstractFileInputOperator<T>
   @Override
   public void endWindow()
   {
-    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
       try {
-        windowDataManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
+        windowDataManager.save(currentWindowRecoveryState, currentWindowId);
       } catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
       }
@@ -553,7 +553,7 @@ public abstract class AbstractFileInputOperator<T>
     //all the recovery data for a window and then processes only those files 
which would be hashed
     //to it in the current run.
     try {
-      Map<Integer, Object> recoveryDataPerOperator = 
windowDataManager.load(windowId);
+      Map<Integer, Object> recoveryDataPerOperator = 
windowDataManager.retrieveAllPartitions(windowId);
 
       for (Object recovery : recoveryDataPerOperator.values()) {
         @SuppressWarnings("unchecked")
@@ -615,7 +615,7 @@ public abstract class AbstractFileInputOperator<T>
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
       return;
     }
 
@@ -836,7 +836,7 @@ public abstract class AbstractFileInputOperator<T>
     List<DirectoryScanner> scanners = scanner.partition(totalCount, 
oldscanners);
 
     Collection<Partition<AbstractFileInputOperator<T>>> newPartitions = 
Lists.newArrayListWithExpectedSize(totalCount);
-    Collection<WindowDataManager> newManagers = 
Lists.newArrayListWithExpectedSize(totalCount);
+    List<WindowDataManager> newManagers = 
windowDataManager.partition(totalCount, deletedOperators);
 
     KryoCloneUtils<AbstractFileInputOperator<T>> cloneUtils = 
KryoCloneUtils.createCloneUtils(this);
     for (int i = 0; i < scanners.size(); i++) {
@@ -888,11 +888,10 @@ public abstract class AbstractFileInputOperator<T>
           pendingFilesIterator.remove();
         }
       }
+      oper.setWindowDataManager(newManagers.get(i));
       newPartitions.add(new 
DefaultPartition<AbstractFileInputOperator<T>>(oper));
-      newManagers.add(oper.windowDataManager);
     }
 
-    windowDataManager.partitioned(newManagers, deletedOperators);
     LOG.info("definePartitions called returning {} partitions", 
newPartitions.size());
     return newPartitions;
   }
@@ -917,7 +916,7 @@ public abstract class AbstractFileInputOperator<T>
   public void committed(long windowId)
   {
     try {
-      windowDataManager.deleteUpTo(operatorId, windowId);
+      windowDataManager.committed(windowId);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
index cd79a2b..4bb53e5 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java
@@ -145,7 +145,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
       blockSize = fs.getDefaultBlockSize(new 
Path(scanner.files.iterator().next()));
     }
 
-    if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestRecoveryWindow()) {
+    if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < 
windowDataManager.getLargestCompletedWindow()) {
       blockMetadataIterator = null;
     } else {
       //don't setup scanner while recovery
@@ -175,7 +175,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   {
     blockCount = 0;
     currentWindowId = windowId;
-    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
       replay(windowId);
     }
   }
@@ -184,7 +184,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   {
     try {
       @SuppressWarnings("unchecked")
-      LinkedList<FileInfo> recoveredData = 
(LinkedList<FileInfo>)windowDataManager.load(operatorId, windowId);
+      LinkedList<FileInfo> recoveredData = 
(LinkedList<FileInfo>)windowDataManager.retrieve(windowId);
       if (recoveredData == null) {
         //This could happen when there are multiple physical instances and one 
of them is ahead in processing windows.
         return;
@@ -209,7 +209,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
         }
       }
 
-      if (windowId == windowDataManager.getLargestRecoveryWindow()) {
+      if (windowId == windowDataManager.getLargestCompletedWindow()) {
         scanner.setup(context);
       }
     } catch (IOException e) {
@@ -220,7 +220,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
       return;
     }
 
@@ -259,9 +259,9 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   @Override
   public void endWindow()
   {
-    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
       try {
-        windowDataManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
+        windowDataManager.save(currentWindowRecoveryState, currentWindowId);
       } catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
       }
@@ -388,7 +388,7 @@ public class FileSplitter implements InputOperator, 
Operator.CheckpointListener
   public void committed(long l)
   {
     try {
-      windowDataManager.deleteUpTo(operatorId, l);
+      windowDataManager.committed(l);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
index b8b21cb..3763ef0 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
@@ -110,7 +111,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
     windowDataManager.setup(context);
     super.setup(context);
 
-    long largestRecoveryWindow = windowDataManager.getLargestRecoveryWindow();
+    long largestRecoveryWindow = windowDataManager.getLargestCompletedWindow();
     if (largestRecoveryWindow == Stateless.WINDOW_ID || 
context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) >
         largestRecoveryWindow) {
       scanner.startScanning(Collections.unmodifiableMap(referenceTimes));
@@ -121,7 +122,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   public void beginWindow(long windowId)
   {
     super.beginWindow(windowId);
-    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
       replay(windowId);
     }
   }
@@ -130,7 +131,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   {
     try {
       @SuppressWarnings("unchecked")
-      LinkedList<ScannedFileInfo> recoveredData = 
(LinkedList<ScannedFileInfo>)windowDataManager.load(operatorId, windowId);
+      LinkedList<ScannedFileInfo> recoveredData = 
(LinkedList<ScannedFileInfo>)windowDataManager.retrieve(windowId);
       if (recoveredData == null) {
         //This could happen when there are multiple physical instances and one 
of them is ahead in processing windows.
         return;
@@ -151,7 +152,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
     } catch (IOException e) {
       throw new RuntimeException("replay", e);
     }
-    if (windowId == windowDataManager.getLargestRecoveryWindow()) {
+    if (windowId == windowDataManager.getLargestCompletedWindow()) {
       scanner.startScanning(Collections.unmodifiableMap(referenceTimes));
     }
   }
@@ -159,7 +160,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
       return;
     }
 
@@ -206,9 +207,9 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   @Override
   public void endWindow()
   {
-    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
       try {
-        windowDataManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
+        windowDataManager.save(currentWindowRecoveryState, currentWindowId);
       } catch (IOException e) {
         throw new RuntimeException("saving recovery", e);
       }
@@ -237,7 +238,7 @@ public class FileSplitterInput extends AbstractFileSplitter 
implements InputOper
   public void committed(long l)
   {
     try {
-      windowDataManager.deleteUpTo(operatorId, l);
+      windowDataManager.committed(l);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -274,16 +275,16 @@ public class FileSplitterInput extends 
AbstractFileSplitter implements InputOper
     private static long DEF_SCAN_INTERVAL_MILLIS = 5000;
     private static String FILE_BEING_COPIED = "_COPYING_";
 
-    private boolean recursive;
+    private boolean recursive = true;
 
     private transient volatile boolean trigger;
 
     @NotNull
     @Size(min = 1)
-    private final Set<String> files;
+    private final Set<String> files = new LinkedHashSet<>();
 
     @Min(0)
-    private long scanIntervalMillis;
+    private long scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS;
     private String filePatternRegularExp;
 
     private String ignoreFilePatternRegularExp;
@@ -306,19 +307,9 @@ public class FileSplitterInput extends 
AbstractFileSplitter implements InputOper
     private transient ScannedFileInfo lastScannedInfo;
     private transient int numDiscoveredPerIteration;
 
-    public TimeBasedDirectoryScanner()
-    {
-      recursive = true;
-      scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS;
-      files = Sets.newLinkedHashSet();
-    }
-
     @Override
     public void setup(Context.OperatorContext context)
     {
-      if (scanService != null) {
-        throw new RuntimeException("multiple calls to setup() detected!");
-      }
       scanService = Executors.newSingleThreadExecutor();
       discoveredFiles = new LinkedBlockingDeque<>();
       atomicThrowable = new AtomicReference<>();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
index bf0fe5c..2b8b58d 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
@@ -19,7 +19,6 @@
 package com.datatorrent.lib.io.jms;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -43,6 +42,7 @@ import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
 import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.commons.lang.mutable.MutableLong;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -54,7 +54,6 @@ import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.lib.counters.BasicCounters;
-import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * This is the base implementation of a JMS input operator.<br/>
@@ -107,7 +106,6 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
 
   @NotNull
   protected WindowDataManager windowDataManager;
-  private transient long[] operatorRecoveredWindows;
   protected transient long currentWindowId;
   protected transient int emitCount;
 
@@ -202,14 +200,6 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
     counters.setCounter(CounterKeys.RECEIVED, new MutableLong());
     counters.setCounter(CounterKeys.REDELIVERED, new MutableLong());
     windowDataManager.setup(context);
-    try {
-      operatorRecoveredWindows = 
windowDataManager.getWindowIds(context.getId());
-      if (operatorRecoveredWindows != null) {
-        Arrays.sort(operatorRecoveredWindows);
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("fetching windows", e);
-    }
   }
 
   /**
@@ -262,7 +252,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   public void beginWindow(long windowId)
   {
     currentWindowId = windowId;
-    if (windowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
       replay(windowId);
     }
   }
@@ -271,11 +261,17 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   {
     try {
       @SuppressWarnings("unchecked")
-      Map<String, T> recoveredData = (Map<String, 
T>)windowDataManager.load(context.getId(), windowId);
+      Map<String, T> recoveredData = (Map<String, 
T>)windowDataManager.retrieve(windowId);
       if (recoveredData == null) {
         return;
       }
       for (Map.Entry<String, T> recoveredEntry : recoveredData.entrySet()) {
+        /*
+          It is important to add the recovered message ids to the pendingAck 
set because there is no guarantee
+          that acknowledgement completed after state was persisted by 
windowDataManager. In that case, the messages are
+          re-delivered by the message bus. Therefore, we compare each message 
against this set and ignore re-delivered
+          messages.
+         */
         pendingAck.add(recoveredEntry.getKey());
         emit(recoveredEntry.getValue());
       }
@@ -287,7 +283,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   @Override
   public void emitTuples()
   {
-    if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
       return;
     }
 
@@ -329,7 +325,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
         throw new RuntimeException(ie);
       }
     } else {
-      DTThrowable.rethrow(lthrowable);
+      Throwables.propagate(lthrowable);
     }
   }
 
@@ -347,10 +343,8 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   @Override
   public void endWindow()
   {
-    if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) {
+    if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
       synchronized (lock) {
-        boolean stateSaved = false;
-        boolean ackCompleted = false;
         try {
           //No more messages can be consumed now. so we will call emit tuples 
once more
           //so that any pending messages can be emitted.
@@ -360,33 +354,25 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
             emitCount++;
             lastMsg = msg;
           }
-          windowDataManager.save(currentWindowRecoveryState, context.getId(), 
currentWindowId);
-          stateSaved = true;
-
+          windowDataManager.save(currentWindowRecoveryState, currentWindowId);
           currentWindowRecoveryState.clear();
           if (lastMsg != null) {
             acknowledge();
           }
-          ackCompleted = true;
           pendingAck.clear();
         } catch (Throwable t) {
-          if (!ackCompleted) {
-            LOG.info("confirm recovery of {} for {} does not exist", 
context.getId(), currentWindowId, t);
-          }
-          DTThrowable.rethrow(t);
-        } finally {
-          if (stateSaved && !ackCompleted) {
-            try {
-              windowDataManager.delete(context.getId(), currentWindowId);
-            } catch (IOException e) {
-              LOG.error("unable to delete corrupted state", e);
-            }
-          }
+          /*
+            When acknowledgement fails after state is persisted by 
windowDataManager, then this window is considered
+            as completed by the operator instance after recovery. However, 
since the acknowledgement failed, the
+            messages will be re-sent by the message bus. In order to address 
that, while re-playing, we add the messages
+            to the pendingAck set. When these messages are re-delivered, we 
compare it against this set and ignore them
+            if there id is already in the set.
+          */
+          Throwables.propagate(t);
         }
       }
       emitCount = 0; //reset emit count
-    } else if (operatorRecoveredWindows != null &&
-        currentWindowId < 
operatorRecoveredWindows[operatorRecoveredWindows.length - 1]) {
+    } else if (currentWindowId < 
windowDataManager.getLargestCompletedWindow()) {
       //pendingAck is not cleared for the last replayed window of this 
operator. This is because there is
       //still a chance that in the previous run the operator crashed after 
saving the state but before acknowledgement.
       pendingAck.clear();
@@ -417,7 +403,7 @@ public abstract class AbstractJMSInputOperator<T> extends 
JMSBase
   public void committed(long windowId)
   {
     try {
-      windowDataManager.deleteUpTo(context.getId(), windowId);
+      windowDataManager.committed(windowId);
     } catch (IOException e) {
       throw new RuntimeException("committing", e);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
index dbf903d..d0652ef 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
@@ -145,8 +145,8 @@ public abstract class 
AbstractManagedStateInnerJoinOperator<K,T> extends Abstrac
     super.setup(context);
     
((FileAccessFSImpl)stream1Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH)
 + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) 
+ Path.SEPARATOR + stream1State);
     
((FileAccessFSImpl)stream2Store.getFileAccess()).setBasePath(context.getValue(DAG.APPLICATION_PATH)
 + Path.SEPARATOR + stateDir + Path.SEPARATOR + String.valueOf(context.getId()) 
+ Path.SEPARATOR + stream2State);
-    stream1Store.getCheckpointManager().setRecoveryPath("managed_state_" + 
stream1State);
-    stream1Store.getCheckpointManager().setRecoveryPath("managed_state_" + 
stream2State);
+    stream1Store.getCheckpointManager().setStatePath("managed_state_" + 
stream1State);
+    stream1Store.getCheckpointManager().setStatePath("managed_state_" + 
stream2State);
     stream1Store.setup(context);
     stream2Store.setup(context);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index 25b3f8b..1e378ec 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -202,28 +202,23 @@ public abstract class AbstractManagedStateImpl
     long activationWindow = 
context.getValue(OperatorContext.ACTIVATION_WINDOW_ID);
 
     if (activationWindow != Stateless.WINDOW_ID) {
-      //delete all the wal files with windows > activationWindow.
       //All the wal files with windows <= activationWindow are loaded and kept 
separately as recovered data.
       try {
-        long[] recoveredWindows = 
checkpointManager.getWindowIds(operatorContext.getId());
-        if (recoveredWindows != null) {
-          for (long recoveredWindow : recoveredWindows) {
-            if (recoveredWindow <= activationWindow) {
-              @SuppressWarnings("unchecked")
-              Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = 
(Map<Long, Map<Slice, Bucket.BucketedValue>>)
-                  checkpointManager.load(operatorContext.getId(), 
recoveredWindow);
-              if (recoveredData != null && !recoveredData.isEmpty()) {
-                for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : 
recoveredData.entrySet()) {
-                  int bucketIdx = prepareBucket(entry.getKey());
-                  buckets[bucketIdx].recoveredData(recoveredWindow, 
entry.getValue());
-                }
-              }
-              checkpointManager.save(recoveredData, operatorContext.getId(), 
recoveredWindow,
-                  true /*skipWritingToWindowFile*/);
-            } else {
-              checkpointManager.delete(operatorContext.getId(), 
recoveredWindow);
+
+        Map<Long, Object> statePerWindow = 
checkpointManager.retrieveAllWindows();
+        for (Map.Entry<Long, Object> stateEntry : statePerWindow.entrySet()) {
+          Preconditions.checkArgument(stateEntry.getKey() <= activationWindow,
+              stateEntry.getKey() + " greater than " + activationWindow);
+          @SuppressWarnings("unchecked")
+          Map<Long, Map<Slice, Bucket.BucketedValue>> state = (Map<Long, 
Map<Slice, Bucket.BucketedValue>>)
+              stateEntry.getValue();
+          if (state != null && !state.isEmpty()) {
+            for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> bucketEntry 
: state.entrySet()) {
+              int bucketIdx = prepareBucket(bucketEntry.getKey());
+              buckets[bucketIdx].recoveredData(stateEntry.getKey(), 
bucketEntry.getValue());
             }
           }
+          checkpointManager.save(state, stateEntry.getKey(), true 
/*skipWritingToWindowFile*/);
         }
       } catch (IOException e) {
         throw new RuntimeException("recovering", e);
@@ -354,7 +349,7 @@ public abstract class AbstractManagedStateImpl
     }
     if (!flashData.isEmpty()) {
       try {
-        checkpointManager.save(flashData, operatorContext.getId(), windowId, 
false);
+        checkpointManager.save(flashData, windowId, false);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -379,8 +374,8 @@ public abstract class AbstractManagedStateImpl
             }
           }
         }
-        checkpointManager.committed(operatorContext.getId(), windowId);
-      } catch (IOException | InterruptedException e) {
+        checkpointManager.committed(windowId);
+      } catch (IOException e) {
         throw new RuntimeException("committing " + windowId, e);
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
index 4852b50..237f4b9 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -19,6 +19,7 @@
 package org.apache.apex.malhar.lib.state.managed;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
@@ -33,10 +34,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.FileSystemWAL;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Queues;
+import com.google.common.primitives.Longs;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.annotation.Stateless;
@@ -78,7 +81,8 @@ public class IncrementalCheckpointManager extends 
FSWindowDataManager
   public IncrementalCheckpointManager()
   {
     super();
-    setRecoveryPath(WAL_RELATIVE_PATH);
+    setStatePath(WAL_RELATIVE_PATH);
+    setRelyOnCheckpoints(true);
   }
 
   @Override
@@ -132,7 +136,7 @@ public class IncrementalCheckpointManager extends 
FSWindowDataManager
             
managedStateContext.getBucketsFileSystem().writeBucketData(windowId, 
singleBucket.getKey(),
                 singleBucket.getValue());
           }
-          
storageAgent.delete(managedStateContext.getOperatorContext().getId(), windowId);
+          committed(windowId);
         } catch (Throwable t) {
           throwable.set(t);
           LOG.debug("transfer window {}", windowId, t);
@@ -150,7 +154,7 @@ public class IncrementalCheckpointManager extends 
FSWindowDataManager
   }
 
   @Override
-  public void save(Object object, int operatorId, long windowId) throws 
IOException
+  public void save(Object object, long windowId) throws IOException
   {
     throw new UnsupportedOperationException("doesn't support saving any 
object");
   }
@@ -159,15 +163,13 @@ public class IncrementalCheckpointManager extends 
FSWindowDataManager
    * The unsaved state combines data received in multiple windows. This window 
data manager persists this data
    * on disk by the window id in which it was requested.
    * @param unsavedData   un-saved data of all buckets.
-   * @param operatorId    operator id.
    * @param windowId      window id.
    * @param skipWriteToWindowFile flag that enables/disables saving the window 
file.
    *
    * @throws IOException
    */
-  public void save(Map<Long, Map<Slice, Bucket.BucketedValue>> unsavedData, 
int operatorId, long windowId,
-      boolean skipWriteToWindowFile)
-      throws IOException
+  public void save(Map<Long, Map<Slice, Bucket.BucketedValue>> unsavedData, 
long windowId,
+      boolean skipWriteToWindowFile) throws IOException
   {
     Throwable lthrowable;
     if ((lthrowable = throwable.get()) != null) {
@@ -177,25 +179,46 @@ public class IncrementalCheckpointManager extends 
FSWindowDataManager
     savedWindows.put(windowId, unsavedData);
 
     if (!skipWriteToWindowFile) {
-      super.save(unsavedData, operatorId, windowId);
+      super.save(unsavedData, windowId);
     }
   }
 
   /**
+   * Retrieves artifacts available for all the windows saved by the enclosing 
partitions.
+   * @return  artifact saved per window.
+   * @throws IOException
+   */
+  public Map<Long, Object> retrieveAllWindows() throws IOException
+  {
+    Map<Long, Object> artifactPerWindow = new HashMap<>();
+    FileSystemWAL.FileSystemWALReader reader = getWal().getReader();
+    reader.seek(getWal().getWalStartPointer());
+    
+    Slice windowSlice = readNext(reader);
+    while 
(reader.getCurrentPointer().compareTo(getWal().getWalEndPointerAfterRecovery()) 
< 0 && windowSlice != null) {
+      long window = Longs.fromByteArray(windowSlice.toByteArray());
+      Object data = fromSlice(readNext(reader));
+      artifactPerWindow.put(window, data);
+      windowSlice = readNext(reader); //null or next window
+    }
+    reader.seek(getWal().getWalStartPointer());
+    return artifactPerWindow;
+  }
+
+  /**
    * Transfers the data which has been committed till windowId to data files.
    *
-   * @param operatorId operator id
-   * @param windowId   window id
+   * @param committedWindowId   window id
    */
-  @SuppressWarnings("UnusedParameters")
-  protected void committed(int operatorId, long windowId) throws IOException, 
InterruptedException
+  @Override
+  public void committed(long committedWindowId) throws IOException
   {
-    LOG.debug("data manager committed {}", windowId);
+    LOG.debug("data manager committed {}", committedWindowId);
     for (Long currentWindow : savedWindows.keySet()) {
       if (currentWindow <= largestWindowAddedToTransferQueue) {
         continue;
       }
-      if (currentWindow <= windowId) {
+      if (currentWindow <= committedWindowId) {
         LOG.debug("to transfer {}", currentWindow);
         largestWindowAddedToTransferQueue = currentWindow;
         windowsToTransfer.add(currentWindow);

Reply via email to