Repository: apex-malhar Updated Branches: refs/heads/master cae42df3c -> 833cbc251
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java index 1b8efff..e5a2832 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java @@ -18,7 +18,6 @@ */ package com.datatorrent.lib.io.fs; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -30,7 +29,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeoutException; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestWatcher; @@ -44,9 +42,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import com.google.common.collect.Sets; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; @@ -106,56 +101,75 @@ public class FileSplitterInputTest fileSplitterInput = new FileSplitterInput(); fileSplitterInput.setBlocksThreshold(100); scanner = new MockScanner(); + scanner.setScanIntervalMillis(500); + scanner.setFilePatternRegularExp(".*[.]txt"); + scanner.setFiles(dataDirectory); fileSplitterInput.setScanner(scanner); - fileSplitterInput.getScanner().setScanIntervalMillis(500); - fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt"); - fileSplitterInput.getScanner().setFiles(dataDirectory); Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); attributes.put(Context.DAGContext.APPLICATION_PATH, "target/" + className + "/" + methodName + "/" + Long.toHexString(System.currentTimeMillis())); context = new OperatorContextTestHelper.TestIdOperatorContext(0, attributes); - fileSplitterInput.setup(context); - fileMetadataSink = new CollectorTestSink<>(); - TestUtils.setSink(fileSplitterInput.filesMetadataOutput, fileMetadataSink); - blockMetadataSink = new CollectorTestSink<>(); - TestUtils.setSink(fileSplitterInput.blocksMetadataOutput, blockMetadataSink); + resetSinks(); } @Override protected void finished(Description description) { filePaths.clear(); - this.fileSplitterInput.teardown(); TestUtils.deleteTargetTestClassFolder(description); } + + private void resetSinks() + { + TestUtils.setSink(fileSplitterInput.filesMetadataOutput, fileMetadataSink); + TestUtils.setSink(fileSplitterInput.blocksMetadataOutput, blockMetadataSink); + } + + private void updateConfig(FSWindowDataManager fsWindowDataManager, + long scanInterval, long blockSize, int blocksThreshold) + { + fileSplitterInput.setWindowDataManager(fsWindowDataManager); + fileSplitterInput.getScanner().setScanIntervalMillis(scanInterval); + fileSplitterInput.setBlockSize(blockSize); + fileSplitterInput.setBlocksThreshold(blocksThreshold); + } } @Rule public TestMeta testMeta = new TestMeta(); - - @Test - public void testFileMetadata() throws InterruptedException + + private void window1TestHelper() throws InterruptedException { testMeta.fileSplitterInput.beginWindow(1); testMeta.scanner.semaphore.acquire(); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); + Assert.assertEquals("File metadata", 12, testMeta.fileMetadataSink.collectedTuples.size()); + for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) { FileSplitterInput.FileMetadata metadata = (FileSplitterInput.FileMetadata)fileMetadata; Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath())); Assert.assertNotNull("name: ", metadata.getFileName()); } - + testMeta.fileMetadataSink.collectedTuples.clear(); } @Test + public void testFileMetadata() throws InterruptedException + { + testMeta.fileSplitterInput.setup(testMeta.context); + window1TestHelper(); + testMeta.fileSplitterInput.teardown(); + } + + @Test public void testScannerFilterForDuplicates() throws InterruptedException { String filePath = testMeta.dataDirectory + Path.SEPARATOR + "file0.txt"; @@ -164,6 +178,7 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500); testMeta.fileSplitterInput.getScanner().setFilePatternRegularExp(".*[.]txt"); testMeta.fileSplitterInput.getScanner().setFiles(filePath); + testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); testMeta.scanner.semaphore.acquire(); @@ -185,11 +200,13 @@ public class FileSplitterInputTest } testMeta.fileMetadataSink.collectedTuples.clear(); + testMeta.fileSplitterInput.teardown(); } @Test public void testBlockMetadataNoSplit() throws InterruptedException { + testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); testMeta.scanner.semaphore.acquire(); @@ -199,12 +216,15 @@ public class FileSplitterInputTest BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata; Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath())); } + testMeta.fileSplitterInput.teardown(); } @Test public void testBlockMetadataWithSplit() throws InterruptedException { testMeta.fileSplitterInput.setBlockSize(2L); + + testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); testMeta.scanner.semaphore.acquire(); @@ -218,6 +238,7 @@ public class FileSplitterInputTest noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0)); } Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size()); + testMeta.fileSplitterInput.teardown(); } @Test @@ -226,25 +247,31 @@ public class FileSplitterInputTest FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager(); testMeta.fileSplitterInput.setWindowDataManager(fsIdempotentStorageManager); - fsIdempotentStorageManager.setup(testMeta.context); + testMeta.fileSplitterInput.setup(testMeta.context); //will emit window 1 from data directory - testFileMetadata(); + window1TestHelper(); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); + testMeta.fileSplitterInput.teardown(); - fsIdempotentStorageManager.setup(testMeta.context); + testMeta.fileSplitterInput = KryoCloneUtils.cloneObject(testMeta.fileSplitterInput); + testMeta.resetSinks(); + + testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size()); for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) { BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata; Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath())); } + testMeta.fileSplitterInput.teardown(); } @Test public void testTimeScan() throws InterruptedException, IOException, TimeoutException { - testFileMetadata(); + testMeta.fileSplitterInput.setup(testMeta.context); + window1TestHelper(); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); @@ -265,13 +292,16 @@ public class FileSplitterInputTest Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size()); + testMeta.fileSplitterInput.teardown(); } @Test public void testTrigger() throws InterruptedException, IOException, TimeoutException { testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000); - testFileMetadata(); + + testMeta.fileSplitterInput.setup(testMeta.context); + window1TestHelper(); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); @@ -293,51 +323,55 @@ public class FileSplitterInputTest Assert.assertEquals("window 2: files", 1, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size()); + testMeta.fileSplitterInput.teardown(); } - - @Test - public void testBlocksThreshold() throws InterruptedException + + private void blocksTestHelper() throws InterruptedException { - int noOfBlocks = 0; - for (int i = 0; i < 12; i++) { - File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt"); - noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0)); - } - - testMeta.fileSplitterInput.setBlockSize(2L); - testMeta.fileSplitterInput.setBlocksThreshold(10); testMeta.fileSplitterInput.beginWindow(1); - testMeta.scanner.semaphore.acquire(); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); - + Assert.assertEquals("Blocks", 10, testMeta.blockMetadataSink.collectedTuples.size()); - + for (int window = 2; window < 8; window++) { testMeta.fileSplitterInput.beginWindow(window); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); } + + int noOfBlocks = 0; + for (int i = 0; i < 12; i++) { + File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt"); + noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0)); + } Assert.assertEquals("Files", 12, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size()); } @Test - public void testIdempotencyWithBlocksThreshold() throws InterruptedException + public void testBlocksThreshold() throws InterruptedException { - FSWindowDataManager fsWindowDataManager = new FSWindowDataManager(); - testMeta.fileSplitterInput.setWindowDataManager(fsWindowDataManager); + testMeta.fileSplitterInput.setBlockSize(2L); testMeta.fileSplitterInput.setBlocksThreshold(10); - testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500); - fsWindowDataManager.setup(testMeta.context); + testMeta.fileSplitterInput.setup(testMeta.context); + blocksTestHelper(); + testMeta.fileSplitterInput.teardown(); + } - testBlocksThreshold(); + private void recoveryTestHelper() throws InterruptedException + { + blocksTestHelper(); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); + testMeta.fileSplitterInput.teardown(); + + testMeta.fileSplitterInput = KryoCloneUtils.cloneObject(testMeta.fileSplitterInput); + testMeta.resetSinks(); - fsWindowDataManager.setup(testMeta.context); + testMeta.fileSplitterInput.setup(testMeta.context); for (int i = 1; i < 8; i++) { testMeta.fileSplitterInput.beginWindow(i); } @@ -346,9 +380,25 @@ public class FileSplitterInputTest } @Test + public void testIdempotencyWithBlocksThreshold() throws InterruptedException + { + FSWindowDataManager fsWindowDataManager = new FSWindowDataManager(); + testMeta.updateConfig(fsWindowDataManager, 500, 2L, 10); + + testMeta.fileSplitterInput.setup(testMeta.context); + recoveryTestHelper(); + testMeta.fileSplitterInput.teardown(); + } + + @Test public void testFirstWindowAfterRecovery() throws IOException, InterruptedException { - testIdempotencyWithBlocksThreshold(); + FSWindowDataManager fsWindowDataManager = new FSWindowDataManager(); + testMeta.updateConfig(fsWindowDataManager, 500, 2L, 10); + testMeta.fileSplitterInput.setup(testMeta.context); + + recoveryTestHelper(); + Thread.sleep(1000); HashSet<String> lines = Sets.newHashSet(); for (int line = 2; line < 4; line++) { @@ -362,34 +412,28 @@ public class FileSplitterInputTest testMeta.blockMetadataSink.clear(); testMeta.fileSplitterInput.beginWindow(8); - testMeta.scanner.semaphore.acquire(); + ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); Assert.assertEquals("Files", 1, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("Blocks", 6, testMeta.blockMetadataSink.collectedTuples.size()); + testMeta.fileSplitterInput.teardown(); } - @Ignore + @Test public void testRecoveryOfPartialFile() throws InterruptedException { FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager(); - testMeta.fileSplitterInput.setWindowDataManager(fsIdempotentStorageManager); - testMeta.fileSplitterInput.setBlockSize(2L); - testMeta.fileSplitterInput.setBlocksThreshold(2); - testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500); + testMeta.updateConfig(fsIdempotentStorageManager, 500L, 2L, 2); - Kryo kryo = new Kryo(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - Output loutput = new Output(bos); - kryo.writeObject(loutput, testMeta.fileSplitterInput); - loutput.close(); + FileSplitterInput checkpointedInput = KryoCloneUtils.cloneObject(testMeta.fileSplitterInput); testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); - ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(); + testMeta.scanner.semaphore.acquire(); testMeta.fileSplitterInput.emitTuples(); testMeta.fileSplitterInput.endWindow(); @@ -403,11 +447,8 @@ public class FileSplitterInputTest testMeta.fileSplitterInput.teardown(); //there was a failure and the operator was re-deployed - Input lInput = new Input(bos.toByteArray()); - testMeta.fileSplitterInput = kryo.readObject(lInput, testMeta.fileSplitterInput.getClass()); - lInput.close(); - TestUtils.setSink(testMeta.fileSplitterInput.blocksMetadataOutput, testMeta.blockMetadataSink); - TestUtils.setSink(testMeta.fileSplitterInput.filesMetadataOutput, testMeta.fileMetadataSink); + testMeta.fileSplitterInput = checkpointedInput; + testMeta.resetSinks(); testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); @@ -440,13 +481,14 @@ public class FileSplitterInputTest testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1)); Assert.assertTrue("Block file name 1", testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2)); + testMeta.fileSplitterInput.teardown(); } @Test public void testRecursive() throws InterruptedException, IOException { - testMeta.fileSplitterInput.getScanner().regex = null; - testFileMetadata(); + testMeta.fileSplitterInput.setup(testMeta.context); + window1TestHelper(); testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); @@ -467,14 +509,13 @@ public class FileSplitterInputTest Assert.assertEquals("window 2: files", 2, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("window 2: blocks", 1, testMeta.blockMetadataSink.collectedTuples.size()); + testMeta.fileSplitterInput.teardown(); } @Test public void testSingleFile() throws InterruptedException, IOException { - testMeta.fileSplitterInput.teardown(); testMeta.fileSplitterInput.setScanner(new MockScanner()); - testMeta.fileSplitterInput.getScanner().regex = null; testMeta.fileSplitterInput.getScanner().setFiles(testMeta.dataDirectory + "/file1.txt"); testMeta.fileSplitterInput.setup(testMeta.context); @@ -486,21 +527,16 @@ public class FileSplitterInputTest Assert.assertEquals("File metadata count", 1, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("File metadata", new File(testMeta.dataDirectory + "/file1.txt").getAbsolutePath(), testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath()); + testMeta.fileSplitterInput.teardown(); } @Test public void testRecoveryOfBlockMetadataIterator() throws InterruptedException { FSWindowDataManager fsWindowDataManager = new FSWindowDataManager(); - - testMeta.fileSplitterInput.setWindowDataManager(fsWindowDataManager); - testMeta.fileSplitterInput.setBlockSize(2L); - testMeta.fileSplitterInput.setBlocksThreshold(2); - testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500); - - - fsWindowDataManager.setup(testMeta.context); - + testMeta.updateConfig(fsWindowDataManager, 500L, 2L, 2); + + testMeta.fileSplitterInput.setup(testMeta.context); testMeta.fileSplitterInput.beginWindow(1); ((MockScanner)testMeta.fileSplitterInput.getScanner()).semaphore.acquire(); @@ -527,13 +563,17 @@ public class FileSplitterInputTest Assert.assertEquals("Recovered Files", 1, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("Recovered Blocks", 2, testMeta.blockMetadataSink.collectedTuples.size()); + + testMeta.fileSplitterInput.teardown(); } @Test public void testFileModificationTest() throws InterruptedException, IOException, TimeoutException { testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(60 * 1000); - testFileMetadata(); + testMeta.fileSplitterInput.setup(testMeta.context); + window1TestHelper(); + testMeta.fileMetadataSink.clear(); testMeta.blockMetadataSink.clear(); @@ -573,13 +613,13 @@ public class FileSplitterInputTest Assert.assertEquals("window 2: files", 0, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("window 2: blocks", 0, testMeta.blockMetadataSink.collectedTuples.size()); - + + testMeta.fileSplitterInput.teardown(); } @Test public void testMultipleNestedInput() throws IOException, InterruptedException { - testMeta.fileSplitterInput.teardown(); File subDir = new File(testMeta.dataDirectory, "subDir"); subDir.mkdir(); File file = new File(subDir, "file.txt"); @@ -612,12 +652,12 @@ public class FileSplitterInputTest } testMeta.fileMetadataSink.collectedTuples.clear(); + testMeta.fileSplitterInput.teardown(); } @Test public void testEmptyDirCopy() throws InterruptedException { - testMeta.fileSplitterInput.teardown(); File emptyDir = new File(testMeta.dataDirectory, "emptyDir"); emptyDir.mkdirs(); testMeta.fileSplitterInput.setScanner(new MockScanner()); @@ -632,6 +672,7 @@ public class FileSplitterInputTest Assert.assertEquals("File metadata count", 1, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("Empty directory not copied.", emptyDir.getName(), testMeta.fileMetadataSink.collectedTuples.get(0).getFileName()); + testMeta.fileSplitterInput.teardown(); } private static class MockScanner extends FileSplitterInput.TimeBasedDirectoryScanner @@ -653,6 +694,6 @@ public class FileSplitterInputTest super.scanIterationComplete(); } } - + private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInputTest.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java index 01febe3..24ab938 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java @@ -346,7 +346,7 @@ public class FileSplitterTest public void testRecoveryOfPartialFile() throws InterruptedException { FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager(); - fsIdempotentStorageManager.setRecoveryPath(testMeta.dataDirectory + '/' + "recovery"); + fsIdempotentStorageManager.setStatePath(testMeta.dataDirectory + '/' + "recovery"); testMeta.fileSplitter.setWindowDataManager(fsIdempotentStorageManager); testMeta.fileSplitter.setBlockSize(2L); testMeta.fileSplitter.setBlocksThreshold(2); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java index 82f1c67..f0b44e6 100644 --- a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java @@ -23,7 +23,6 @@ import java.io.File; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; @@ -41,7 +40,6 @@ import org.apache.commons.io.FileUtils; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; -import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.CollectorTestSink; @@ -127,7 +125,7 @@ public class JMSStringInputOperatorTest testMeta.operator.activate(testMeta.context); Assert.assertEquals("largest recovery window", 1, - testMeta.operator.getWindowDataManager().getLargestRecoveryWindow()); + testMeta.operator.getWindowDataManager().getLargestCompletedWindow()); testMeta.operator.beginWindow(1); testMeta.operator.endWindow(); @@ -135,39 +133,6 @@ public class JMSStringInputOperatorTest testMeta.sink.collectedTuples.clear(); } - @Test - public void testFailureAfterPersistenceAndBeforeRecovery() throws Exception - { - testMeta.operator = new JMSStringInputOperator() - { - @Override - protected void acknowledge() throws JMSException - { - throw new RuntimeException("fail ack"); - } - }; - testMeta.operator.setSubject("TEST.FOO"); - testMeta.operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost"); - - testMeta.operator.setup(testMeta.context); - testMeta.operator.activate(testMeta.context); - - produceMsg(10); - Thread.sleep(1000); - testMeta.operator.beginWindow(1); - testMeta.operator.emitTuples(); - try { - testMeta.operator.endWindow(); - } catch (Throwable t) { - LOG.debug("ack failed"); - } - testMeta.operator.setup(testMeta.context); - testMeta.operator.activate(testMeta.context); - - Assert.assertEquals("window 1 should not exist", Stateless.WINDOW_ID, - testMeta.operator.getWindowDataManager().getLargestRecoveryWindow()); - } - private void produceMsg(int numMessages) throws Exception { // Create a ConnectionFactory http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java index 53e787d..9b7083d 100644 --- a/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java @@ -197,7 +197,7 @@ public class SQSStringInputOperatorTest testMeta.operator.activate(testMeta.context); Assert.assertEquals("largest recovery window", 1, - testMeta.operator.getWindowDataManager().getLargestRecoveryWindow()); + testMeta.operator.getWindowDataManager().getLargestCompletedWindow()); testMeta.operator.beginWindow(1); testMeta.operator.endWindow(); @@ -254,7 +254,7 @@ public class SQSStringInputOperatorTest testMeta.operator.activate(testMeta.context); Assert.assertEquals("window 1 should exist", 1, - testMeta.operator.getWindowDataManager().getLargestRecoveryWindow()); + testMeta.operator.getWindowDataManager().getLargestCompletedWindow()); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java index 2e35b0a..ce8052a 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java @@ -98,14 +98,15 @@ public class IncrementalCheckpointManagerTest { testMeta.checkpointManager.setup(testMeta.managedStateContext); Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = ManagedStateTestUtils.getTestData(0, 5, 0); - testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false); + testMeta.checkpointManager.save(buckets5, 10, false); testMeta.checkpointManager.teardown(); - testMeta.checkpointManager = new IncrementalCheckpointManager(); + KryoCloneUtils<IncrementalCheckpointManager> cloneUtils = KryoCloneUtils.createCloneUtils(testMeta.checkpointManager); + testMeta.checkpointManager = cloneUtils.getClone(); testMeta.checkpointManager.setup(testMeta.managedStateContext); @SuppressWarnings("unchecked") Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5After = (Map<Long, Map<Slice, Bucket.BucketedValue>>) - testMeta.checkpointManager.load(testMeta.operatorId, 10); + testMeta.checkpointManager.retrieve(10); Assert.assertEquals("saved", buckets5, buckets5After); testMeta.checkpointManager.teardown(); @@ -117,12 +118,12 @@ public class IncrementalCheckpointManagerTest testMeta.checkpointManager.setup(testMeta.managedStateContext); Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = ManagedStateTestUtils.getTestData(0, 5, 0); - testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false); + testMeta.checkpointManager.save(buckets5, 10, false); //Need to synchronously call transfer window files so shutting down the other thread. testMeta.checkpointManager.teardown(); Thread.sleep(500); - testMeta.checkpointManager.committed(testMeta.operatorId, 10); + testMeta.checkpointManager.committed(10); testMeta.checkpointManager.transferWindowFiles(); for (int i = 0; i < 5; i++) { @@ -143,8 +144,8 @@ public class IncrementalCheckpointManagerTest testMeta.checkpointManager.setup(testMeta.managedStateContext); Map<Long, Map<Slice, Bucket.BucketedValue>> data = ManagedStateTestUtils.getTestData(0, 5, 0); - testMeta.checkpointManager.save(data, testMeta.operatorId, 10, false); - testMeta.checkpointManager.committed(testMeta.operatorId, 10); + testMeta.checkpointManager.save(data, 10, false); + testMeta.checkpointManager.committed(10); latch.await(); testMeta.checkpointManager.teardown(); Thread.sleep(500); @@ -183,8 +184,8 @@ public class IncrementalCheckpointManagerTest } @Override - protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data) - throws IOException + protected void writeBucketData(long windowId, long bucketId, Map<Slice, + Bucket.BucketedValue> data) throws IOException { super.writeBucketData(windowId, bucketId, data); if (windowId == 10) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java index 21d5b76..9939bb9 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java @@ -20,9 +20,8 @@ package org.apache.apex.malhar.lib.wal; import java.io.File; import java.io.IOException; -import java.util.Arrays; +import java.util.List; import java.util.Map; -import java.util.TreeSet; import org.junit.Assert; import org.junit.Rule; @@ -30,13 +29,6 @@ import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -44,6 +36,7 @@ import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.Pair; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.util.TestUtils; @@ -56,20 +49,17 @@ public class FSWindowDataManagerTest { String applicationPath; - FSWindowDataManager storageManager; - Context.OperatorContext context; + Attribute.AttributeMap.DefaultAttributeMap attributes; @Override protected void starting(Description description) { TestUtils.deleteTargetTestClassFolder(description); super.starting(description); - storageManager = new FSWindowDataManager(); applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); - Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes = new Attribute.AttributeMap.DefaultAttributeMap(); attributes.put(DAG.APPLICATION_PATH, applicationPath); - context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); } @Override @@ -85,31 +75,39 @@ public class FSWindowDataManagerTest @Test public void testLargestRecoveryWindow() { - testMeta.storageManager.setup(testMeta.context); - Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow()); - testMeta.storageManager.teardown(); + Pair<Context.OperatorContext, FSWindowDataManager> pair = createManagerAndContextFor(1); + pair.second.setup(pair.first); + Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, pair.second.getLargestCompletedWindow()); + pair.second.teardown(); } @Test public void testSave() throws IOException { - testMeta.storageManager.setup(testMeta.context); + Pair<Context.OperatorContext, FSWindowDataManager> pair = createManagerAndContextFor(1); + pair.second.setup(pair.first); Map<Integer, String> data = Maps.newHashMap(); data.put(1, "one"); data.put(2, "two"); data.put(3, "three"); - testMeta.storageManager.save(data, 1, 1); - testMeta.storageManager.setup(testMeta.context); + pair.second.save(data, 1); + + pair.second.setup(pair.first); @SuppressWarnings("unchecked") - Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1); - Assert.assertEquals("dataOf1", data, decoded); - testMeta.storageManager.teardown(); + Map<Integer, String> artifact = (Map<Integer, String>)pair.second.retrieve(1); + Assert.assertEquals("dataOf1", data, artifact); + pair.second.teardown(); } @Test - public void testLoad() throws IOException + public void testRetrieve() throws IOException { - testMeta.storageManager.setup(testMeta.context); + Pair<Context.OperatorContext, FSWindowDataManager> pair1 = createManagerAndContextFor(1); + Pair<Context.OperatorContext, FSWindowDataManager> pair2 = createManagerAndContextFor(2); + + pair1.second.setup(pair1.first); + pair2.second.setup(pair2.first); + Map<Integer, String> dataOf1 = Maps.newHashMap(); dataOf1.put(1, "one"); dataOf1.put(2, "two"); @@ -120,25 +118,30 @@ public class FSWindowDataManagerTest dataOf2.put(5, "five"); dataOf2.put(6, "six"); - testMeta.storageManager.save(dataOf1, 1, 1); - testMeta.storageManager.save(dataOf2, 2, 1); - testMeta.storageManager.setup(testMeta.context); - Map<Integer, Object> decodedStates = testMeta.storageManager.load(1); - Assert.assertEquals("no of states", 2, decodedStates.size()); - for (Integer operatorId : decodedStates.keySet()) { - if (operatorId == 1) { - Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1)); - } else { - Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2)); - } - } - testMeta.storageManager.teardown(); + pair1.second.save(dataOf1, 1); + pair2.second.save(dataOf2, 1); + + pair1.second.setup(pair1.first); + Object artifact1 = pair1.second.retrieve(1); + Assert.assertEquals("data of 1", dataOf1, artifact1); + + pair2.second.setup(pair2.first); + Object artifact2 = pair2.second.retrieve(1); + Assert.assertEquals("data of 2", dataOf2, artifact2); + + pair1.second.teardown(); + pair2.second.teardown(); } @Test - public void testRecovery() throws IOException + public void testRetrieveAllPartitions() throws IOException { - testMeta.storageManager.setup(testMeta.context); + Pair<Context.OperatorContext, FSWindowDataManager> pair1 = createManagerAndContextFor(1); + Pair<Context.OperatorContext, FSWindowDataManager> pair2 = createManagerAndContextFor(2); + + pair1.second.setup(pair1.first); + pair2.second.setup(pair2.first); + Map<Integer, String> dataOf1 = Maps.newHashMap(); dataOf1.put(1, "one"); dataOf1.put(2, "two"); @@ -149,37 +152,103 @@ public class FSWindowDataManagerTest dataOf2.put(5, "five"); dataOf2.put(6, "six"); - testMeta.storageManager.save(dataOf1, 1, 1); - testMeta.storageManager.save(dataOf2, 2, 2); + pair1.second.save(dataOf1, 1); + pair2.second.save(dataOf2, 1); + + pair1.second.teardown(); + pair2.second.teardown(); - testMeta.storageManager.setup(testMeta.context); - Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow()); - testMeta.storageManager.teardown(); + List<WindowDataManager> managers = pair1.second.partition(3, null); + + managers.get(0).setup(pair1.first); + Map<Integer, Object> artifacts = managers.get(0).retrieveAllPartitions(1); + Assert.assertEquals("num artifacts", 2, artifacts.size()); + + Assert.assertEquals("artifact 1", dataOf1, artifacts.get(1)); + Assert.assertEquals("artifact 2", dataOf2, artifacts.get(2)); + + managers.get(0).teardown(); } @Test - public void testGetWindowIds() throws IOException + public void testRecovery() throws IOException { - testMeta.storageManager.setup(testMeta.context); - Map<Integer, String> data = Maps.newHashMap(); - data.put(1, "one"); - data.put(2, "two"); - data.put(3, "three"); + Pair<Context.OperatorContext, FSWindowDataManager> pair1 = createManagerAndContextFor(1); + Pair<Context.OperatorContext, FSWindowDataManager> pair2 = createManagerAndContextFor(2); + + pair1.second.setup(pair1.first); + pair2.second.setup(pair2.first); + + Map<Integer, String> dataOf1 = Maps.newHashMap(); + dataOf1.put(1, "one"); + dataOf1.put(2, "two"); + dataOf1.put(3, "three"); + + Map<Integer, String> dataOf2 = Maps.newHashMap(); + dataOf2.put(4, "four"); + dataOf2.put(5, "five"); + dataOf2.put(6, "six"); - testMeta.storageManager.save(data, 1, 1); - testMeta.storageManager.save(data, 2, 2); + pair1.second.save(dataOf1, 1); + pair2.second.save(dataOf2, 2); - testMeta.storageManager.setup(testMeta.context); + pair1.second.setup(pair1.first); + Assert.assertEquals("largest recovery window", 1, pair1.second.getLargestCompletedWindow()); - Assert.assertArrayEquals(new long[] {1, 2}, testMeta.storageManager.getWindowIds()); + pair2.second.setup(pair2.first); + Assert.assertEquals("largest recovery window", 2, pair2.second.getLargestCompletedWindow()); - testMeta.storageManager.teardown(); - } + pair1.second.teardown(); + pair2.second.teardown(); + WindowDataManager manager = pair1.second.partition(1, Sets.newHashSet(2)).get(0); + manager.setup(pair1.first); + Assert.assertEquals("largest recovery window", 1, manager.getLargestCompletedWindow()); + manager.teardown(); + } + @Test public void testDelete() throws IOException { - testMeta.storageManager.setup(testMeta.context); + Pair<Context.OperatorContext, FSWindowDataManager> pair1 = createManagerAndContextFor(1); + pair1.second.getWal().setMaxLength(2); + pair1.second.setup(pair1.first); + + Map<Integer, String> dataOf1 = Maps.newHashMap(); + dataOf1.put(1, "one"); + dataOf1.put(2, "two"); + dataOf1.put(3, "three"); + + for (int i = 1; i <= 9; ++i) { + pair1.second.save(dataOf1, i); + } + + pair1.second.committed(3); + pair1.second.teardown(); + + Pair<Context.OperatorContext, FSWindowDataManager> pair1AfterRecovery = createManagerAndContextFor(1); + testMeta.attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L); + pair1AfterRecovery.second.setup(pair1AfterRecovery.first); + + Assert.assertEquals("window 1 deleted", null, pair1AfterRecovery.second.retrieve(1)); + Assert.assertEquals("window 3 deleted", null, pair1AfterRecovery.second.retrieve(3)); + + Assert.assertEquals("window 4 exists", dataOf1, pair1AfterRecovery.second.retrieve(4)); + pair1.second.teardown(); + } + + @Test + public void testDeleteDoesNotRemoveTmpFiles() throws IOException + { + Pair<Context.OperatorContext, FSWindowDataManager> pair1 = createManagerAndContextFor(1); + pair1.second.setup(pair1.first); + + Pair<Context.OperatorContext, FSWindowDataManager> pair2 = createManagerAndContextFor(2); + pair2.second.setup(pair2.first); + + Pair<Context.OperatorContext, FSWindowDataManager> pair3 = createManagerAndContextFor(3); + pair3.second.setup(pair3.first); + Map<Integer, String> dataOf1 = Maps.newHashMap(); dataOf1.put(1, "one"); dataOf1.put(2, "two"); @@ -196,41 +265,65 @@ public class FSWindowDataManagerTest dataOf2.put(9, "nine"); for (int i = 1; i <= 9; ++i) { - testMeta.storageManager.save(dataOf1, 1, i); + pair1.second.save(dataOf1, i); + } + + for (int i = 1; i <= 6; ++i) { + pair2.second.save(dataOf2, i); } - testMeta.storageManager.save(dataOf2, 2, 1); - testMeta.storageManager.save(dataOf3, 3, 1); - - testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager), - Sets.newHashSet(2, 3)); - testMeta.storageManager.setup(testMeta.context); - testMeta.storageManager.deleteUpTo(1, 6); - - Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath()); - FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration()); - FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1))); - Assert.assertEquals("number of windows for 1", 3, fileStatuses.length); - TreeSet<String> windows = Sets.newTreeSet(); - for (FileStatus fileStatus : fileStatuses) { - windows.add(fileStatus.getPath().getName()); + for (int i = 1; i <= 3; ++i) { + pair3.second.save(dataOf3, i); } - Assert.assertEquals("window list for 1", Sets.newTreeSet(Arrays.asList("7", "8", "9")), windows); - Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2)))); - Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3)))); - testMeta.storageManager.teardown(); + + pair1.second.teardown(); + pair2.second.teardown(); + pair3.second.teardown(); + + FSWindowDataManager fsManager = (FSWindowDataManager)pair1.second.partition(1, Sets.newHashSet(2, 3)).get(0); + fsManager.setup(pair1.first); + + Assert.assertEquals("recovery window", 3, fsManager.getLargestCompletedWindow()); + + Map<Integer, Object> artifacts = fsManager.retrieveAllPartitions(1); + Assert.assertEquals("num artifacts", 3, artifacts.size()); + + fsManager.committed(3); + fsManager.teardown(); + + testMeta.attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 3L); + fsManager.setup(pair1.first); + Assert.assertEquals("recovery window", Stateless.WINDOW_ID, fsManager.getLargestCompletedWindow()); + fsManager.teardown(); } @Test public void testAbsoluteRecoveryPath() throws IOException { - testMeta.storageManager.setRecoveryPathRelativeToAppPath(false); + Pair<Context.OperatorContext, FSWindowDataManager> pair = createManagerAndContextFor(1); + pair.second.setStatePathRelativeToAppPath(false); long time = System.currentTimeMillis(); - testMeta.storageManager.setRecoveryPath("target/" + time); - testSave(); + pair.second.setStatePath("target/" + time); + + pair.second.setup(pair.first); + Map<Integer, String> data = Maps.newHashMap(); + data.put(1, "one"); + data.put(2, "two"); + data.put(3, "three"); + pair.second.save(data, 1); + File recoveryDir = new File("target/" + time); - Assert.assertTrue("recover path exist", recoveryDir.isDirectory()); - FileUtils.deleteDirectory(recoveryDir); + Assert.assertTrue("recover filePath exist", recoveryDir.isDirectory()); + pair.second.teardown(); + } + + private Pair<Context.OperatorContext, FSWindowDataManager> createManagerAndContextFor(int operatorId) + { + FSWindowDataManager dataManager = new FSWindowDataManager(); + Context.OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(operatorId, + testMeta.attributes); + + return new Pair<>(context, dataManager); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java b/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java index cf8bb34..aefaac9 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/wal/FileSystemWALTest.java @@ -206,7 +206,6 @@ public class FileSystemWALTest testMeta.fsWAL.teardown(); } - @Test public void testFinalizeAfterDelay() throws IOException { @@ -452,8 +451,7 @@ public class FileSystemWALTest Assert.assertEquals("num tuples", expectedTuples, tuples); } - private static void write1KRecords(FileSystemWAL.FileSystemWALWriter writer, int numRecords) - throws IOException + private static void write1KRecords(FileSystemWAL.FileSystemWALWriter writer, int numRecords) throws IOException { for (int i = 0; i < numRecords; i++) { writer.append(getRandomSlice(1020));
