Repository: apex-malhar
Updated Branches:
  refs/heads/master cae42df3c -> 833cbc251


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java 
b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index 1b8efff..e5a2832 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.lib.io.fs;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -30,7 +29,6 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeoutException;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestWatcher;
@@ -44,9 +42,6 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import com.google.common.collect.Sets;
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
@@ -106,56 +101,75 @@ public class FileSplitterInputTest
       fileSplitterInput = new FileSplitterInput();
       fileSplitterInput.setBlocksThreshold(100);
       scanner = new MockScanner();
+      scanner.setScanIntervalMillis(500);
+      scanner.setFilePatternRegularExp(".*[.]txt");
+      scanner.setFiles(dataDirectory);
       fileSplitterInput.setScanner(scanner);
-      fileSplitterInput.getScanner().setScanIntervalMillis(500);
-      fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt");
-      fileSplitterInput.getScanner().setFiles(dataDirectory);
 
       Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
       attributes.put(Context.DAGContext.APPLICATION_PATH,
           "target/" + className + "/" + methodName + "/" + 
Long.toHexString(System.currentTimeMillis()));
 
       context = new OperatorContextTestHelper.TestIdOperatorContext(0, 
attributes);
-      fileSplitterInput.setup(context);
-
       fileMetadataSink = new CollectorTestSink<>();
-      TestUtils.setSink(fileSplitterInput.filesMetadataOutput, 
fileMetadataSink);
-
       blockMetadataSink = new CollectorTestSink<>();
-      TestUtils.setSink(fileSplitterInput.blocksMetadataOutput, 
blockMetadataSink);
+      resetSinks();
     }
 
     @Override
     protected void finished(Description description)
     {
       filePaths.clear();
-      this.fileSplitterInput.teardown();
       TestUtils.deleteTargetTestClassFolder(description);
     }
+
+    private void resetSinks()
+    {
+      TestUtils.setSink(fileSplitterInput.filesMetadataOutput, 
fileMetadataSink);
+      TestUtils.setSink(fileSplitterInput.blocksMetadataOutput, 
blockMetadataSink);
+    }
+
+    private void updateConfig(FSWindowDataManager fsWindowDataManager,
+        long scanInterval, long blockSize, int blocksThreshold)
+    {
+      fileSplitterInput.setWindowDataManager(fsWindowDataManager);
+      fileSplitterInput.getScanner().setScanIntervalMillis(scanInterval);
+      fileSplitterInput.setBlockSize(blockSize);
+      fileSplitterInput.setBlocksThreshold(blocksThreshold);
+    }
   }
 
   @Rule
   public TestMeta testMeta = new TestMeta();
-
-  @Test
-  public void testFileMetadata() throws InterruptedException
+  
+  private void window1TestHelper() throws InterruptedException
   {
     testMeta.fileSplitterInput.beginWindow(1);
     testMeta.scanner.semaphore.acquire();
 
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
+
     Assert.assertEquals("File metadata", 12, 
testMeta.fileMetadataSink.collectedTuples.size());
+
     for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) {
       FileSplitterInput.FileMetadata metadata = 
(FileSplitterInput.FileMetadata)fileMetadata;
       Assert.assertTrue("path: " + metadata.getFilePath(), 
testMeta.filePaths.contains(metadata.getFilePath()));
       Assert.assertNotNull("name: ", metadata.getFileName());
     }
-
+    
     testMeta.fileMetadataSink.collectedTuples.clear();
   }
 
   @Test
+  public void testFileMetadata() throws InterruptedException
+  {
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    window1TestHelper();
+    testMeta.fileSplitterInput.teardown();
+  }
+
+  @Test
   public void testScannerFilterForDuplicates() throws InterruptedException
   {
     String filePath = testMeta.dataDirectory + Path.SEPARATOR + "file0.txt";
@@ -164,6 +178,7 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
     
testMeta.fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt");
     testMeta.fileSplitterInput.getScanner().setFiles(filePath);
+
     testMeta.fileSplitterInput.setup(testMeta.context);
     testMeta.fileSplitterInput.beginWindow(1);
     testMeta.scanner.semaphore.acquire();
@@ -185,11 +200,13 @@ public class FileSplitterInputTest
     }
 
     testMeta.fileMetadataSink.collectedTuples.clear();
+    testMeta.fileSplitterInput.teardown();
   }
 
   @Test
   public void testBlockMetadataNoSplit() throws InterruptedException
   {
+    testMeta.fileSplitterInput.setup(testMeta.context);
     testMeta.fileSplitterInput.beginWindow(1);
     testMeta.scanner.semaphore.acquire();
 
@@ -199,12 +216,15 @@ public class FileSplitterInputTest
       BlockMetadata.FileBlockMetadata metadata = 
(BlockMetadata.FileBlockMetadata)blockMetadata;
       Assert.assertTrue("path: " + metadata.getFilePath(), 
testMeta.filePaths.contains(metadata.getFilePath()));
     }
+    testMeta.fileSplitterInput.teardown();
   }
 
   @Test
   public void testBlockMetadataWithSplit() throws InterruptedException
   {
     testMeta.fileSplitterInput.setBlockSize(2L);
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
     testMeta.fileSplitterInput.beginWindow(1);
     testMeta.scanner.semaphore.acquire();
 
@@ -218,6 +238,7 @@ public class FileSplitterInputTest
       noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
     }
     Assert.assertEquals("Blocks", noOfBlocks, 
testMeta.blockMetadataSink.collectedTuples.size());
+    testMeta.fileSplitterInput.teardown();
   }
 
   @Test
@@ -226,25 +247,31 @@ public class FileSplitterInputTest
     FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager();
     
testMeta.fileSplitterInput.setWindowDataManager(fsIdempotentStorageManager);
 
-    fsIdempotentStorageManager.setup(testMeta.context);
+    testMeta.fileSplitterInput.setup(testMeta.context);
     //will emit window 1 from data directory
-    testFileMetadata();
+    window1TestHelper();
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
+    testMeta.fileSplitterInput.teardown();
 
-    fsIdempotentStorageManager.setup(testMeta.context);
+    testMeta.fileSplitterInput = 
KryoCloneUtils.cloneObject(testMeta.fileSplitterInput);
+    testMeta.resetSinks();
+    
+    testMeta.fileSplitterInput.setup(testMeta.context);
     testMeta.fileSplitterInput.beginWindow(1);
     Assert.assertEquals("Blocks", 12, 
testMeta.blockMetadataSink.collectedTuples.size());
     for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) {
       BlockMetadata.FileBlockMetadata metadata = 
(BlockMetadata.FileBlockMetadata)blockMetadata;
       Assert.assertTrue("path: " + metadata.getFilePath(), 
testMeta.filePaths.contains(metadata.getFilePath()));
     }
+    testMeta.fileSplitterInput.teardown();
   }
 
   @Test
   public void testTimeScan() throws InterruptedException, IOException, 
TimeoutException
   {
-    testFileMetadata();
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    window1TestHelper();
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
@@ -265,13 +292,16 @@ public class FileSplitterInputTest
 
     Assert.assertEquals("window 2: files", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("window 2: blocks", 1, 
testMeta.blockMetadataSink.collectedTuples.size());
+    testMeta.fileSplitterInput.teardown();
   }
 
   @Test
   public void testTrigger() throws InterruptedException, IOException, 
TimeoutException
   {
     testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
-    testFileMetadata();
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    window1TestHelper();
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
@@ -293,51 +323,55 @@ public class FileSplitterInputTest
 
     Assert.assertEquals("window 2: files", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("window 2: blocks", 1, 
testMeta.blockMetadataSink.collectedTuples.size());
+    testMeta.fileSplitterInput.teardown();
   }
-
-  @Test
-  public void testBlocksThreshold() throws InterruptedException
+  
+  private void blocksTestHelper() throws InterruptedException
   {
-    int noOfBlocks = 0;
-    for (int i = 0; i < 12; i++) {
-      File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt");
-      noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
-    }
-
-    testMeta.fileSplitterInput.setBlockSize(2L);
-    testMeta.fileSplitterInput.setBlocksThreshold(10);
     testMeta.fileSplitterInput.beginWindow(1);
-
     testMeta.scanner.semaphore.acquire();
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
-
+    
     Assert.assertEquals("Blocks", 10, 
testMeta.blockMetadataSink.collectedTuples.size());
-
+    
     for (int window = 2; window < 8; window++) {
       testMeta.fileSplitterInput.beginWindow(window);
       testMeta.fileSplitterInput.emitTuples();
       testMeta.fileSplitterInput.endWindow();
     }
+    
+    int noOfBlocks = 0;
+    for (int i = 0; i < 12; i++) {
+      File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt");
+      noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0));
+    }
 
     Assert.assertEquals("Files", 12, 
testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("Blocks", noOfBlocks, 
testMeta.blockMetadataSink.collectedTuples.size());
   }
 
   @Test
-  public void testIdempotencyWithBlocksThreshold() throws InterruptedException
+  public void testBlocksThreshold() throws InterruptedException
   {
-    FSWindowDataManager fsWindowDataManager = new FSWindowDataManager();
-    testMeta.fileSplitterInput.setWindowDataManager(fsWindowDataManager);
+    testMeta.fileSplitterInput.setBlockSize(2L);
     testMeta.fileSplitterInput.setBlocksThreshold(10);
-    testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
-    fsWindowDataManager.setup(testMeta.context);
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    blocksTestHelper();
+    testMeta.fileSplitterInput.teardown();
+  }
 
-    testBlocksThreshold();
+  private void recoveryTestHelper() throws InterruptedException
+  {
+    blocksTestHelper();
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
+    testMeta.fileSplitterInput.teardown();
+
+    testMeta.fileSplitterInput = 
KryoCloneUtils.cloneObject(testMeta.fileSplitterInput);
+    testMeta.resetSinks();
 
-    fsWindowDataManager.setup(testMeta.context);
+    testMeta.fileSplitterInput.setup(testMeta.context);
     for (int i = 1; i < 8; i++) {
       testMeta.fileSplitterInput.beginWindow(i);
     }
@@ -346,9 +380,25 @@ public class FileSplitterInputTest
   }
 
   @Test
+  public void testIdempotencyWithBlocksThreshold() throws InterruptedException
+  {
+    FSWindowDataManager fsWindowDataManager = new FSWindowDataManager();
+    testMeta.updateConfig(fsWindowDataManager, 500, 2L, 10);
+    
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    recoveryTestHelper();
+    testMeta.fileSplitterInput.teardown();
+  }
+
+  @Test
   public void testFirstWindowAfterRecovery() throws IOException, 
InterruptedException
   {
-    testIdempotencyWithBlocksThreshold();
+    FSWindowDataManager fsWindowDataManager = new FSWindowDataManager();
+    testMeta.updateConfig(fsWindowDataManager, 500, 2L, 10);
+    testMeta.fileSplitterInput.setup(testMeta.context);
+
+    recoveryTestHelper();
+    
     Thread.sleep(1000);
     HashSet<String> lines = Sets.newHashSet();
     for (int line = 2; line < 4; line++) {
@@ -362,34 +412,28 @@ public class FileSplitterInputTest
     testMeta.blockMetadataSink.clear();
 
     testMeta.fileSplitterInput.beginWindow(8);
-    testMeta.scanner.semaphore.acquire();
+    ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
 
     Assert.assertEquals("Files", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("Blocks", 6, 
testMeta.blockMetadataSink.collectedTuples.size());
+    testMeta.fileSplitterInput.teardown();
   }
 
-  @Ignore
+  @Test
   public void testRecoveryOfPartialFile() throws InterruptedException
   {
     FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager();
-    
testMeta.fileSplitterInput.setWindowDataManager(fsIdempotentStorageManager);
-    testMeta.fileSplitterInput.setBlockSize(2L);
-    testMeta.fileSplitterInput.setBlocksThreshold(2);
-    testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
+    testMeta.updateConfig(fsIdempotentStorageManager, 500L, 2L, 2);
 
-    Kryo kryo = new Kryo();
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    Output loutput = new Output(bos);
-    kryo.writeObject(loutput, testMeta.fileSplitterInput);
-    loutput.close();
+    FileSplitterInput checkpointedInput = 
KryoCloneUtils.cloneObject(testMeta.fileSplitterInput);
 
     testMeta.fileSplitterInput.setup(testMeta.context);
 
     testMeta.fileSplitterInput.beginWindow(1);
 
-    ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
+    testMeta.scanner.semaphore.acquire();
     testMeta.fileSplitterInput.emitTuples();
     testMeta.fileSplitterInput.endWindow();
 
@@ -403,11 +447,8 @@ public class FileSplitterInputTest
     testMeta.fileSplitterInput.teardown();
 
     //there was a failure and the operator was re-deployed
-    Input lInput = new Input(bos.toByteArray());
-    testMeta.fileSplitterInput = kryo.readObject(lInput, 
testMeta.fileSplitterInput.getClass());
-    lInput.close();
-    TestUtils.setSink(testMeta.fileSplitterInput.blocksMetadataOutput, 
testMeta.blockMetadataSink);
-    TestUtils.setSink(testMeta.fileSplitterInput.filesMetadataOutput, 
testMeta.fileMetadataSink);
+    testMeta.fileSplitterInput = checkpointedInput;
+    testMeta.resetSinks();
 
     testMeta.fileSplitterInput.setup(testMeta.context);
     testMeta.fileSplitterInput.beginWindow(1);
@@ -440,13 +481,14 @@ public class FileSplitterInputTest
         
testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1));
     Assert.assertTrue("Block file name 1",
         
testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2));
+    testMeta.fileSplitterInput.teardown();
   }
 
   @Test
   public void testRecursive() throws InterruptedException, IOException
   {
-    testMeta.fileSplitterInput.getScanner().regex = null;
-    testFileMetadata();
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    window1TestHelper();
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
@@ -467,14 +509,13 @@ public class FileSplitterInputTest
 
     Assert.assertEquals("window 2: files", 2, 
testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("window 2: blocks", 1, 
testMeta.blockMetadataSink.collectedTuples.size());
+    testMeta.fileSplitterInput.teardown();
   }
 
   @Test
   public void testSingleFile() throws InterruptedException, IOException
   {
-    testMeta.fileSplitterInput.teardown();
     testMeta.fileSplitterInput.setScanner(new MockScanner());
-    testMeta.fileSplitterInput.getScanner().regex = null;
     testMeta.fileSplitterInput.getScanner().setFiles(testMeta.dataDirectory + 
"/file1.txt");
 
     testMeta.fileSplitterInput.setup(testMeta.context);
@@ -486,21 +527,16 @@ public class FileSplitterInputTest
     Assert.assertEquals("File metadata count", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("File metadata", new File(testMeta.dataDirectory + 
"/file1.txt").getAbsolutePath(),
         testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath());
+    testMeta.fileSplitterInput.teardown();
   }
 
   @Test
   public void testRecoveryOfBlockMetadataIterator() throws InterruptedException
   {
     FSWindowDataManager fsWindowDataManager = new FSWindowDataManager();
-
-    testMeta.fileSplitterInput.setWindowDataManager(fsWindowDataManager);
-    testMeta.fileSplitterInput.setBlockSize(2L);
-    testMeta.fileSplitterInput.setBlocksThreshold(2);
-    testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
-
-
-    fsWindowDataManager.setup(testMeta.context);
-
+    testMeta.updateConfig(fsWindowDataManager, 500L, 2L, 2);
+    
+    testMeta.fileSplitterInput.setup(testMeta.context);
     testMeta.fileSplitterInput.beginWindow(1);
 
     ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
@@ -527,13 +563,17 @@ public class FileSplitterInputTest
 
     Assert.assertEquals("Recovered Files", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("Recovered Blocks", 2, 
testMeta.blockMetadataSink.collectedTuples.size());
+
+    testMeta.fileSplitterInput.teardown();
   }
 
   @Test
   public void testFileModificationTest() throws InterruptedException, 
IOException, TimeoutException
   {
     testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000);
-    testFileMetadata();
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    window1TestHelper();
+
     testMeta.fileMetadataSink.clear();
     testMeta.blockMetadataSink.clear();
 
@@ -573,13 +613,13 @@ public class FileSplitterInputTest
 
     Assert.assertEquals("window 2: files", 0, 
testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("window 2: blocks", 0, 
testMeta.blockMetadataSink.collectedTuples.size());
-
+    
+    testMeta.fileSplitterInput.teardown();
   }
 
   @Test
   public void testMultipleNestedInput() throws IOException, 
InterruptedException
   {
-    testMeta.fileSplitterInput.teardown();
     File subDir = new File(testMeta.dataDirectory, "subDir");
     subDir.mkdir();
     File file = new File(subDir, "file.txt");
@@ -612,12 +652,12 @@ public class FileSplitterInputTest
     }
 
     testMeta.fileMetadataSink.collectedTuples.clear();
+    testMeta.fileSplitterInput.teardown();
   }
 
   @Test
   public void testEmptyDirCopy() throws InterruptedException
   {
-    testMeta.fileSplitterInput.teardown();
     File emptyDir = new File(testMeta.dataDirectory, "emptyDir");
     emptyDir.mkdirs();
     testMeta.fileSplitterInput.setScanner(new MockScanner());
@@ -632,6 +672,7 @@ public class FileSplitterInputTest
     Assert.assertEquals("File metadata count", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
     Assert.assertEquals("Empty directory not copied.", emptyDir.getName(),
         testMeta.fileMetadataSink.collectedTuples.get(0).getFileName());
+    testMeta.fileSplitterInput.teardown();
   }
 
   private static class MockScanner extends 
FileSplitterInput.TimeBasedDirectoryScanner
@@ -653,6 +694,6 @@ public class FileSplitterInputTest
       super.scanIterationComplete();
     }
   }
-
+  
   private static final Logger LOG = 
LoggerFactory.getLogger(FileSplitterInputTest.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java 
b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
index 01febe3..24ab938 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java
@@ -346,7 +346,7 @@ public class FileSplitterTest
   public void testRecoveryOfPartialFile() throws InterruptedException
   {
     FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager();
-    fsIdempotentStorageManager.setRecoveryPath(testMeta.dataDirectory + '/' + 
"recovery");
+    fsIdempotentStorageManager.setStatePath(testMeta.dataDirectory + '/' + 
"recovery");
     testMeta.fileSplitter.setWindowDataManager(fsIdempotentStorageManager);
     testMeta.fileSplitter.setBlockSize(2L);
     testMeta.fileSplitter.setBlocksThreshold(2);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
 
b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
index 82f1c67..f0b44e6 100644
--- 
a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
@@ -23,7 +23,6 @@ import java.io.File;
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
@@ -41,7 +40,6 @@ import org.apache.commons.io.FileUtils;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
-import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 
@@ -127,7 +125,7 @@ public class JMSStringInputOperatorTest
     testMeta.operator.activate(testMeta.context);
 
     Assert.assertEquals("largest recovery window", 1,
-        testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
+        testMeta.operator.getWindowDataManager().getLargestCompletedWindow());
 
     testMeta.operator.beginWindow(1);
     testMeta.operator.endWindow();
@@ -135,39 +133,6 @@ public class JMSStringInputOperatorTest
     testMeta.sink.collectedTuples.clear();
   }
 
-  @Test
-  public void testFailureAfterPersistenceAndBeforeRecovery() throws Exception
-  {
-    testMeta.operator = new JMSStringInputOperator()
-    {
-      @Override
-      protected void acknowledge() throws JMSException
-      {
-        throw new RuntimeException("fail ack");
-      }
-    };
-    testMeta.operator.setSubject("TEST.FOO");
-    
testMeta.operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL,
 "vm://localhost");
-
-    testMeta.operator.setup(testMeta.context);
-    testMeta.operator.activate(testMeta.context);
-
-    produceMsg(10);
-    Thread.sleep(1000);
-    testMeta.operator.beginWindow(1);
-    testMeta.operator.emitTuples();
-    try {
-      testMeta.operator.endWindow();
-    } catch (Throwable t) {
-      LOG.debug("ack failed");
-    }
-    testMeta.operator.setup(testMeta.context);
-    testMeta.operator.activate(testMeta.context);
-
-    Assert.assertEquals("window 1 should not exist", Stateless.WINDOW_ID,
-        testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
-  }
-
   private void produceMsg(int numMessages) throws Exception
   {
     // Create a ConnectionFactory

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java
 
b/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java
index 53e787d..9b7083d 100644
--- 
a/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java
@@ -197,7 +197,7 @@ public class SQSStringInputOperatorTest
     testMeta.operator.activate(testMeta.context);
 
     Assert.assertEquals("largest recovery window", 1,
-        testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
+        testMeta.operator.getWindowDataManager().getLargestCompletedWindow());
 
     testMeta.operator.beginWindow(1);
     testMeta.operator.endWindow();
@@ -254,7 +254,7 @@ public class SQSStringInputOperatorTest
     testMeta.operator.activate(testMeta.context);
 
     Assert.assertEquals("window 1 should exist", 1,
-        testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
+        testMeta.operator.getWindowDataManager().getLargestCompletedWindow());
   }
 
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
index 2e35b0a..ce8052a 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
@@ -98,14 +98,15 @@ public class IncrementalCheckpointManagerTest
   {
     testMeta.checkpointManager.setup(testMeta.managedStateContext);
     Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = 
ManagedStateTestUtils.getTestData(0, 5, 0);
-    testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false);
+    testMeta.checkpointManager.save(buckets5, 10, false);
     testMeta.checkpointManager.teardown();
 
-    testMeta.checkpointManager = new IncrementalCheckpointManager();
+    KryoCloneUtils<IncrementalCheckpointManager> cloneUtils = 
KryoCloneUtils.createCloneUtils(testMeta.checkpointManager);
+    testMeta.checkpointManager = cloneUtils.getClone();
     testMeta.checkpointManager.setup(testMeta.managedStateContext);
     @SuppressWarnings("unchecked")
     Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5After = (Map<Long, 
Map<Slice, Bucket.BucketedValue>>)
-        testMeta.checkpointManager.load(testMeta.operatorId, 10);
+        testMeta.checkpointManager.retrieve(10);
 
     Assert.assertEquals("saved", buckets5, buckets5After);
     testMeta.checkpointManager.teardown();
@@ -117,12 +118,12 @@ public class IncrementalCheckpointManagerTest
     testMeta.checkpointManager.setup(testMeta.managedStateContext);
 
     Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = 
ManagedStateTestUtils.getTestData(0, 5, 0);
-    testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false);
+    testMeta.checkpointManager.save(buckets5, 10, false);
     //Need to synchronously call transfer window files so shutting down the 
other thread.
     testMeta.checkpointManager.teardown();
     Thread.sleep(500);
 
-    testMeta.checkpointManager.committed(testMeta.operatorId, 10);
+    testMeta.checkpointManager.committed(10);
     testMeta.checkpointManager.transferWindowFiles();
 
     for (int i = 0; i < 5; i++) {
@@ -143,8 +144,8 @@ public class IncrementalCheckpointManagerTest
     testMeta.checkpointManager.setup(testMeta.managedStateContext);
 
     Map<Long, Map<Slice, Bucket.BucketedValue>> data = 
ManagedStateTestUtils.getTestData(0, 5, 0);
-    testMeta.checkpointManager.save(data, testMeta.operatorId, 10, false);
-    testMeta.checkpointManager.committed(testMeta.operatorId, 10);
+    testMeta.checkpointManager.save(data, 10, false);
+    testMeta.checkpointManager.committed(10);
     latch.await();
     testMeta.checkpointManager.teardown();
     Thread.sleep(500);
@@ -183,8 +184,8 @@ public class IncrementalCheckpointManagerTest
     }
 
     @Override
-    protected void writeBucketData(long windowId, long bucketId, Map<Slice, 
Bucket.BucketedValue> data)
-        throws IOException
+    protected void writeBucketData(long windowId, long bucketId, Map<Slice,
+        Bucket.BucketedValue> data) throws IOException
     {
       super.writeBucketData(windowId, bucketId, data);
       if (windowId == 10) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
index 21d5b76..9939bb9 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java
@@ -20,9 +20,8 @@ package org.apache.apex.malhar.lib.wal;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
-import java.util.TreeSet;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -30,13 +29,6 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
@@ -44,6 +36,7 @@ import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.Pair;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.util.TestUtils;
 
@@ -56,20 +49,17 @@ public class FSWindowDataManagerTest
   {
 
     String applicationPath;
-    FSWindowDataManager storageManager;
-    Context.OperatorContext context;
+    Attribute.AttributeMap.DefaultAttributeMap attributes;
 
     @Override
     protected void starting(Description description)
     {
       TestUtils.deleteTargetTestClassFolder(description);
       super.starting(description);
-      storageManager = new FSWindowDataManager();
       applicationPath = "target/" + description.getClassName() + "/" + 
description.getMethodName();
 
-      Attribute.AttributeMap.DefaultAttributeMap attributes = new 
Attribute.AttributeMap.DefaultAttributeMap();
+      attributes = new Attribute.AttributeMap.DefaultAttributeMap();
       attributes.put(DAG.APPLICATION_PATH, applicationPath);
-      context = new OperatorContextTestHelper.TestIdOperatorContext(1, 
attributes);
     }
 
     @Override
@@ -85,31 +75,39 @@ public class FSWindowDataManagerTest
   @Test
   public void testLargestRecoveryWindow()
   {
-    testMeta.storageManager.setup(testMeta.context);
-    Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, 
testMeta.storageManager.getLargestRecoveryWindow());
-    testMeta.storageManager.teardown();
+    Pair<Context.OperatorContext, FSWindowDataManager> pair = 
createManagerAndContextFor(1);
+    pair.second.setup(pair.first);
+    Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, 
pair.second.getLargestCompletedWindow());
+    pair.second.teardown();
   }
 
   @Test
   public void testSave() throws IOException
   {
-    testMeta.storageManager.setup(testMeta.context);
+    Pair<Context.OperatorContext, FSWindowDataManager> pair = 
createManagerAndContextFor(1);
+    pair.second.setup(pair.first);
     Map<Integer, String> data = Maps.newHashMap();
     data.put(1, "one");
     data.put(2, "two");
     data.put(3, "three");
-    testMeta.storageManager.save(data, 1, 1);
-    testMeta.storageManager.setup(testMeta.context);
+    pair.second.save(data, 1);
+
+    pair.second.setup(pair.first);
     @SuppressWarnings("unchecked")
-    Map<Integer, String> decoded = (Map<Integer, 
String>)testMeta.storageManager.load(1, 1);
-    Assert.assertEquals("dataOf1", data, decoded);
-    testMeta.storageManager.teardown();
+    Map<Integer, String> artifact = (Map<Integer, 
String>)pair.second.retrieve(1);
+    Assert.assertEquals("dataOf1", data, artifact);
+    pair.second.teardown();
   }
 
   @Test
-  public void testLoad() throws IOException
+  public void testRetrieve() throws IOException
   {
-    testMeta.storageManager.setup(testMeta.context);
+    Pair<Context.OperatorContext, FSWindowDataManager> pair1 = 
createManagerAndContextFor(1);
+    Pair<Context.OperatorContext, FSWindowDataManager> pair2 = 
createManagerAndContextFor(2);
+
+    pair1.second.setup(pair1.first);
+    pair2.second.setup(pair2.first);
+
     Map<Integer, String> dataOf1 = Maps.newHashMap();
     dataOf1.put(1, "one");
     dataOf1.put(2, "two");
@@ -120,25 +118,30 @@ public class FSWindowDataManagerTest
     dataOf2.put(5, "five");
     dataOf2.put(6, "six");
 
-    testMeta.storageManager.save(dataOf1, 1, 1);
-    testMeta.storageManager.save(dataOf2, 2, 1);
-    testMeta.storageManager.setup(testMeta.context);
-    Map<Integer, Object> decodedStates = testMeta.storageManager.load(1);
-    Assert.assertEquals("no of states", 2, decodedStates.size());
-    for (Integer operatorId : decodedStates.keySet()) {
-      if (operatorId == 1) {
-        Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1));
-      } else {
-        Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2));
-      }
-    }
-    testMeta.storageManager.teardown();
+    pair1.second.save(dataOf1, 1);
+    pair2.second.save(dataOf2, 1);
+
+    pair1.second.setup(pair1.first);
+    Object artifact1 = pair1.second.retrieve(1);
+    Assert.assertEquals("data of 1", dataOf1, artifact1);
+
+    pair2.second.setup(pair2.first);
+    Object artifact2 = pair2.second.retrieve(1);
+    Assert.assertEquals("data of 2", dataOf2, artifact2);
+
+    pair1.second.teardown();
+    pair2.second.teardown();
   }
 
   @Test
-  public void testRecovery() throws IOException
+  public void testRetrieveAllPartitions() throws IOException
   {
-    testMeta.storageManager.setup(testMeta.context);
+    Pair<Context.OperatorContext, FSWindowDataManager> pair1 = 
createManagerAndContextFor(1);
+    Pair<Context.OperatorContext, FSWindowDataManager> pair2 = 
createManagerAndContextFor(2);
+
+    pair1.second.setup(pair1.first);
+    pair2.second.setup(pair2.first);
+
     Map<Integer, String> dataOf1 = Maps.newHashMap();
     dataOf1.put(1, "one");
     dataOf1.put(2, "two");
@@ -149,37 +152,103 @@ public class FSWindowDataManagerTest
     dataOf2.put(5, "five");
     dataOf2.put(6, "six");
 
-    testMeta.storageManager.save(dataOf1, 1, 1);
-    testMeta.storageManager.save(dataOf2, 2, 2);
+    pair1.second.save(dataOf1, 1);
+    pair2.second.save(dataOf2, 1);
+
+    pair1.second.teardown();
+    pair2.second.teardown();
 
-    testMeta.storageManager.setup(testMeta.context);
-    Assert.assertEquals("largest recovery window", 2, 
testMeta.storageManager.getLargestRecoveryWindow());
-    testMeta.storageManager.teardown();
+    List<WindowDataManager> managers = pair1.second.partition(3, null);
+
+    managers.get(0).setup(pair1.first);
+    Map<Integer, Object> artifacts = managers.get(0).retrieveAllPartitions(1);
+    Assert.assertEquals("num artifacts", 2, artifacts.size());
+
+    Assert.assertEquals("artifact 1", dataOf1, artifacts.get(1));
+    Assert.assertEquals("artifact 2", dataOf2, artifacts.get(2));
+
+    managers.get(0).teardown();
   }
 
   @Test
-  public void testGetWindowIds() throws IOException
+  public void testRecovery() throws IOException
   {
-    testMeta.storageManager.setup(testMeta.context);
-    Map<Integer, String> data = Maps.newHashMap();
-    data.put(1, "one");
-    data.put(2, "two");
-    data.put(3, "three");
+    Pair<Context.OperatorContext, FSWindowDataManager> pair1 = 
createManagerAndContextFor(1);
+    Pair<Context.OperatorContext, FSWindowDataManager> pair2 = 
createManagerAndContextFor(2);
+
+    pair1.second.setup(pair1.first);
+    pair2.second.setup(pair2.first);
+
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
 
-    testMeta.storageManager.save(data, 1, 1);
-    testMeta.storageManager.save(data, 2, 2);
+    pair1.second.save(dataOf1, 1);
+    pair2.second.save(dataOf2, 2);
 
-    testMeta.storageManager.setup(testMeta.context);
+    pair1.second.setup(pair1.first);
+    Assert.assertEquals("largest recovery window", 1, 
pair1.second.getLargestCompletedWindow());
 
-    Assert.assertArrayEquals(new long[] {1, 2}, 
testMeta.storageManager.getWindowIds());
+    pair2.second.setup(pair2.first);
+    Assert.assertEquals("largest recovery window", 2, 
pair2.second.getLargestCompletedWindow());
 
-    testMeta.storageManager.teardown();
-  }
+    pair1.second.teardown();
+    pair2.second.teardown();
 
+    WindowDataManager manager = pair1.second.partition(1, 
Sets.newHashSet(2)).get(0);
+    manager.setup(pair1.first);
+    Assert.assertEquals("largest recovery window", 1, 
manager.getLargestCompletedWindow());
+    manager.teardown();
+  }
+  
   @Test
   public void testDelete() throws IOException
   {
-    testMeta.storageManager.setup(testMeta.context);
+    Pair<Context.OperatorContext, FSWindowDataManager> pair1 = 
createManagerAndContextFor(1);
+    pair1.second.getWal().setMaxLength(2);
+    pair1.second.setup(pair1.first);
+    
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+    
+    for (int i = 1; i <= 9; ++i) {
+      pair1.second.save(dataOf1, i);
+    }
+    
+    pair1.second.committed(3);
+    pair1.second.teardown();
+    
+    Pair<Context.OperatorContext, FSWindowDataManager> pair1AfterRecovery = 
createManagerAndContextFor(1);
+    testMeta.attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
+    pair1AfterRecovery.second.setup(pair1AfterRecovery.first);
+    
+    Assert.assertEquals("window 1 deleted", null, 
pair1AfterRecovery.second.retrieve(1));
+    Assert.assertEquals("window 3 deleted", null, 
pair1AfterRecovery.second.retrieve(3));
+    
+    Assert.assertEquals("window 4 exists", dataOf1, 
pair1AfterRecovery.second.retrieve(4));
+    pair1.second.teardown();
+  }
+
+  @Test
+  public void testDeleteDoesNotRemoveTmpFiles() throws IOException
+  {
+    Pair<Context.OperatorContext, FSWindowDataManager> pair1 = 
createManagerAndContextFor(1);
+    pair1.second.setup(pair1.first);
+
+    Pair<Context.OperatorContext, FSWindowDataManager> pair2 = 
createManagerAndContextFor(2);
+    pair2.second.setup(pair2.first);
+
+    Pair<Context.OperatorContext, FSWindowDataManager> pair3 = 
createManagerAndContextFor(3);
+    pair3.second.setup(pair3.first);
+
     Map<Integer, String> dataOf1 = Maps.newHashMap();
     dataOf1.put(1, "one");
     dataOf1.put(2, "two");
@@ -196,41 +265,65 @@ public class FSWindowDataManagerTest
     dataOf2.put(9, "nine");
 
     for (int i = 1; i <= 9; ++i) {
-      testMeta.storageManager.save(dataOf1, 1, i);
+      pair1.second.save(dataOf1, i);
+    }
+
+    for (int i = 1; i <= 6; ++i) {
+      pair2.second.save(dataOf2, i);
     }
 
-    testMeta.storageManager.save(dataOf2, 2, 1);
-    testMeta.storageManager.save(dataOf3, 3, 1);
-
-    
testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
-        Sets.newHashSet(2, 3));
-    testMeta.storageManager.setup(testMeta.context);
-    testMeta.storageManager.deleteUpTo(1, 6);
-
-    Path appPath = new Path(testMeta.applicationPath + '/' + 
testMeta.storageManager.getRecoveryPath());
-    FileSystem fs = FileSystem.newInstance(appPath.toUri(), new 
Configuration());
-    FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, 
Integer.toString(1)));
-    Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
-    TreeSet<String> windows = Sets.newTreeSet();
-    for (FileStatus fileStatus : fileStatuses) {
-      windows.add(fileStatus.getPath().getName());
+    for (int i = 1; i <= 3; ++i) {
+      pair3.second.save(dataOf3, i);
     }
-    Assert.assertEquals("window list for 1", 
Sets.newTreeSet(Arrays.asList("7", "8", "9")), windows);
-    Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, 
Integer.toString(2))));
-    Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, 
Integer.toString(3))));
-    testMeta.storageManager.teardown();
+
+    pair1.second.teardown();
+    pair2.second.teardown();
+    pair3.second.teardown();
+
+    FSWindowDataManager fsManager = 
(FSWindowDataManager)pair1.second.partition(1, Sets.newHashSet(2, 3)).get(0);
+    fsManager.setup(pair1.first);
+
+    Assert.assertEquals("recovery window", 3, 
fsManager.getLargestCompletedWindow());
+
+    Map<Integer, Object> artifacts = fsManager.retrieveAllPartitions(1);
+    Assert.assertEquals("num artifacts", 3, artifacts.size());
+
+    fsManager.committed(3);
+    fsManager.teardown();
+
+    testMeta.attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 3L);
+    fsManager.setup(pair1.first);
+    Assert.assertEquals("recovery window", Stateless.WINDOW_ID, 
fsManager.getLargestCompletedWindow());
+    fsManager.teardown();
   }
 
   @Test
   public void testAbsoluteRecoveryPath() throws IOException
   {
-    testMeta.storageManager.setRecoveryPathRelativeToAppPath(false);
+    Pair<Context.OperatorContext, FSWindowDataManager> pair = 
createManagerAndContextFor(1);
+    pair.second.setStatePathRelativeToAppPath(false);
     long time = System.currentTimeMillis();
-    testMeta.storageManager.setRecoveryPath("target/" + time);
-    testSave();
+    pair.second.setStatePath("target/" + time);
+
+    pair.second.setup(pair.first);
+    Map<Integer, String> data = Maps.newHashMap();
+    data.put(1, "one");
+    data.put(2, "two");
+    data.put(3, "three");
+    pair.second.save(data, 1);
+
     File recoveryDir = new File("target/" + time);
-    Assert.assertTrue("recover path exist", recoveryDir.isDirectory());
-    FileUtils.deleteDirectory(recoveryDir);
+    Assert.assertTrue("recover filePath exist", recoveryDir.isDirectory());
+    pair.second.teardown();
+  }
+
+  private Pair<Context.OperatorContext, FSWindowDataManager> 
createManagerAndContextFor(int operatorId)
+  {
+    FSWindowDataManager dataManager = new FSWindowDataManager();
+    Context.OperatorContext context =  new 
OperatorContextTestHelper.TestIdOperatorContext(operatorId,
+        testMeta.attributes);
+
+    return new Pair<>(context, dataManager);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java 
b/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java
index cf8bb34..aefaac9 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java
@@ -206,7 +206,6 @@ public class FileSystemWALTest
     testMeta.fsWAL.teardown();
   }
 
-
   @Test
   public void testFinalizeAfterDelay() throws IOException
   {
@@ -452,8 +451,7 @@ public class FileSystemWALTest
     Assert.assertEquals("num tuples", expectedTuples, tuples);
   }
 
-  private static void write1KRecords(FileSystemWAL.FileSystemWALWriter writer, 
int numRecords)
-      throws IOException
+  private static void write1KRecords(FileSystemWAL.FileSystemWALWriter writer, 
int numRecords) throws IOException
   {
     for (int i = 0; i < numRecords; i++) {
       writer.append(getRandomSlice(1020));

Reply via email to