Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 fd2f42bd9 -> 5373a3cb6
APEXMALHAR-2003 NPE in blockMetaDataIterator after recovery Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/289dad74 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/289dad74 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/289dad74 Branch: refs/heads/devel-3 Commit: 289dad7426ade1779733cead614d52946154211f Parents: d23e283 Author: Chandni Singh <[email protected]> Authored: Tue Feb 23 17:46:10 2016 -0800 Committer: Chandni Singh <[email protected]> Committed: Wed Feb 24 18:54:05 2016 -0800 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileSplitter.java | 6 +-- .../lib/io/fs/FileSplitterInputTest.java | 44 ++++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/289dad74/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java index afd86cf..cd47d48 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java @@ -285,7 +285,7 @@ public abstract class AbstractFileSplitter extends BaseOperator private long pos; private int blockNumber; - private final transient AbstractFileSplitter splitter; + private final AbstractFileSplitter splitter; protected BlockMetadataIterator() { @@ -319,8 +319,8 @@ public abstract class AbstractFileSplitter extends BaseOperator } boolean isLast = length >= fileMetadata.getFileLength(); long lengthOfFileInBlock = isLast ? fileMetadata.getFileLength() : length; - BlockMetadata.FileBlockMetadata fileBlock = splitter - .buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, fileMetadata, isLast); + BlockMetadata.FileBlockMetadata fileBlock = splitter.buildBlockMetadata(pos, lengthOfFileInBlock, blockNumber, + fileMetadata, isLast); pos = lengthOfFileInBlock; return fileBlock; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/289dad74/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java index 2f605eb..cd0de2d 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java @@ -51,6 +51,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.io.block.BlockMetadata; import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KryoCloneUtils; import com.datatorrent.lib.util.TestUtils; /** @@ -455,6 +456,49 @@ public class FileSplitterInputTest testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath()); } + @Test + public void testRecoveryOfBlockMetadataIterator() throws InterruptedException + { + IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = + new IdempotentStorageManager.FSIdempotentStorageManager(); + + testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager); + testMeta.fileSplitterInput.setBlockSize(2L); + testMeta.fileSplitterInput.setBlocksThreshold(2); + testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500); + + + testMeta.fileSplitterInput.setup(testMeta.context); + + testMeta.fileSplitterInput.beginWindow(1); + + ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(); + testMeta.fileSplitterInput.emitTuples(); + testMeta.fileSplitterInput.endWindow(); + + //file0.txt has just 5 blocks. Since blocks threshold is 2, only 2 are emitted. + Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size()); + + testMeta.fileMetadataSink.clear(); + testMeta.blockMetadataSink.clear(); + + //At this point the operator was check-pointed and then there was a failure. + testMeta.fileSplitterInput.teardown(); + + //The operator was restored from persisted state and re-deployed. + testMeta.fileSplitterInput = KryoCloneUtils.cloneObject(testMeta.fileSplitterInput); + TestUtils.setSink(testMeta.fileSplitterInput.blocksMetadataOutput, testMeta.blockMetadataSink); + TestUtils.setSink(testMeta.fileSplitterInput.filesMetadataOutput, testMeta.fileMetadataSink); + + testMeta.fileSplitterInput.setup(testMeta.context); + testMeta.fileSplitterInput.beginWindow(1); + + Assert.assertEquals("Recovered Files", 1, testMeta.fileMetadataSink.collectedTuples.size()); + Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size()); + } + + private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner { transient Semaphore semaphore;
