Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 fd2f42bd9 -> 5373a3cb6


APEXMALHAR-2003 NPE in blockMetaDataIterator after recovery


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/289dad74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/289dad74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/289dad74

Branch: refs/heads/devel-3
Commit: 289dad7426ade1779733cead614d52946154211f
Parents: d23e283
Author: Chandni Singh <[email protected]>
Authored: Tue Feb 23 17:46:10 2016 -0800
Committer: Chandni Singh <[email protected]>
Committed: Wed Feb 24 18:54:05 2016 -0800

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileSplitter.java         |  6 +--
 .../lib/io/fs/FileSplitterInputTest.java        | 44 ++++++++++++++++++++
 2 files changed, 47 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/289dad74/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
index afd86cf..cd47d48 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java
@@ -285,7 +285,7 @@ public abstract class AbstractFileSplitter extends 
BaseOperator
     private long pos;
     private int blockNumber;
 
-    private final transient AbstractFileSplitter splitter;
+    private final AbstractFileSplitter splitter;
 
     protected BlockMetadataIterator()
     {
@@ -319,8 +319,8 @@ public abstract class AbstractFileSplitter extends 
BaseOperator
       }
       boolean isLast = length >= fileMetadata.getFileLength();
       long lengthOfFileInBlock = isLast ? fileMetadata.getFileLength() : 
length;
-      BlockMetadata.FileBlockMetadata fileBlock = splitter
-          .buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, 
fileMetadata, isLast);
+      BlockMetadata.FileBlockMetadata fileBlock = 
splitter.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber,
+          fileMetadata, isLast);
       pos = lengthOfFileInBlock;
       return fileBlock;
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/289dad74/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java 
b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
index 2f605eb..cd0de2d 100644
--- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java
@@ -51,6 +51,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.io.block.BlockMetadata;
 import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KryoCloneUtils;
 import com.datatorrent.lib.util.TestUtils;
 
 /**
@@ -455,6 +456,49 @@ public class FileSplitterInputTest
         testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath());
   }
 
+  @Test
+  public void testRecoveryOfBlockMetadataIterator() throws InterruptedException
+  {
+    IdempotentStorageManager.FSIdempotentStorageManager 
fsIdempotentStorageManager =
+        new IdempotentStorageManager.FSIdempotentStorageManager();
+
+    
testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager);
+    testMeta.fileSplitterInput.setBlockSize(2L);
+    testMeta.fileSplitterInput.setBlocksThreshold(2);
+    testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500);
+
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
+
+    testMeta.fileSplitterInput.beginWindow(1);
+
+    ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire();
+    testMeta.fileSplitterInput.emitTuples();
+    testMeta.fileSplitterInput.endWindow();
+
+    //file0.txt has just 5 blocks. Since blocks threshold is 2, only 2 are 
emitted.
+    Assert.assertEquals("Files", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Blocks", 2, 
testMeta.blockMetadataSink.collectedTuples.size());
+
+    testMeta.fileMetadataSink.clear();
+    testMeta.blockMetadataSink.clear();
+
+    //At this point the operator was check-pointed and then there was a 
failure.
+    testMeta.fileSplitterInput.teardown();
+
+    //The operator was restored from persisted state and re-deployed.
+    testMeta.fileSplitterInput = 
KryoCloneUtils.cloneObject(testMeta.fileSplitterInput);
+    TestUtils.setSink(testMeta.fileSplitterInput.blocksMetadataOutput, 
testMeta.blockMetadataSink);
+    TestUtils.setSink(testMeta.fileSplitterInput.filesMetadataOutput, 
testMeta.fileMetadataSink);
+
+    testMeta.fileSplitterInput.setup(testMeta.context);
+    testMeta.fileSplitterInput.beginWindow(1);
+
+    Assert.assertEquals("Recovered Files", 1, 
testMeta.fileMetadataSink.collectedTuples.size());
+    Assert.assertEquals("Recovered Blocks", 2, 
testMeta.blockMetadataSink.collectedTuples.size());
+  }
+
+
   private static class MockScanner extends 
FileSplitterInput.TimeBasedDirectoryScanner
   {
     transient Semaphore semaphore;

Reply via email to