Repository: apex-malhar Updated Branches: refs/heads/master 176c5efe8 -> dd80369d4
APEXMALHAR-2460 Redshift output module unable to emit tuples. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/dd80369d Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/dd80369d Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/dd80369d Branch: refs/heads/master Commit: dd80369d4177bd8ce31b16562fbcdca792ae1ac7 Parents: 176c5ef Author: deepak-narkhede <[email protected]> Authored: Wed Mar 29 17:10:44 2017 +0530 Committer: deepak-narkhede <[email protected]> Committed: Thu Apr 13 11:22:22 2017 +0530 ---------------------------------------------------------------------- .../apex/malhar/lib/fs/s3/S3Reconciler.java | 4 +- .../apex/malhar/lib/fs/s3/S3ReconcilerTest.java | 49 ++++++++++++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd80369d/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java index 1e7b68c..dc76e91 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java @@ -174,11 +174,11 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator. while (doneTuples.peek() != null) { FSRecordCompactionOperator.OutputMetaData metaData = doneTuples.poll(); removeIntermediateFiles(metaData); - /*if (outputPort.isConnected()) { + if (outputPort.isConnected()) { // Emit the meta data with S3 path metaData.setPath(getDirectoryName() + Path.SEPARATOR + metaData.getFileName()); outputPort.emit(metaData); - }*/ + } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/dd80369d/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java index 9023b5c..d138b53 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java @@ -42,6 +42,7 @@ import com.amazonaws.services.s3.model.PutObjectResult; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; +import com.datatorrent.lib.testbench.CollectorTestSink; import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; import static org.mockito.Matchers.any; @@ -54,6 +55,7 @@ public class S3ReconcilerTest { S3Reconciler underTest; Context.OperatorContext context; + CollectorTestSink<Object> sink; @Mock AmazonS3 s3clientMock; @@ -102,6 +104,53 @@ public class S3ReconcilerTest public TestMeta testMeta = new TestMeta(); @Test + public void verifyS3ReconclierOutputTuple() throws Exception + { + String fileName = "s3-compaction_1.0"; + String path = testMeta.outputPath + Path.SEPARATOR + fileName; + long size = 80; + + File file = new File(path); + File tmpFile = new File(path + "." + System.currentTimeMillis() + ".tmp"); + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < 10; i++) { + sb.append("Record" + i + "\n"); + if (i == 5) { + FileUtils.write(tmpFile, sb.toString()); + } + } + FileUtils.write(file, sb.toString()); + + // Set test sink and later on collect the emitted tuples in this sink. + testMeta.sink = new CollectorTestSink<Object>(); + testMeta.underTest.outputPort.setSink(testMeta.sink); + + // Create meta information to be emitted as tuple. + FSRecordCompactionOperator.OutputMetaData outputMetaData = new FSRecordCompactionOperator.OutputMetaData(path, fileName, size); + testMeta.underTest.beginWindow(0); + testMeta.underTest.input.process(outputMetaData); + testMeta.underTest.endWindow(); + + for (int i = 1; i < 60; i++) { + testMeta.underTest.beginWindow(i); + testMeta.underTest.endWindow(); + } + testMeta.underTest.committed(59); + + // retrieve the result count from output port. + testMeta.sink.waitForResultCount(1, 12000); + + for (int i = 60; i < 70; i++) { + testMeta.underTest.beginWindow(i); + Thread.sleep(10); + testMeta.underTest.endWindow(); + } + + // verify the number of tuples emitted. + Assert.assertEquals(1, testMeta.sink.getCount(false)); + } + + @Test public void testFileClearing() throws Exception { String fileName = "s3-compaction_1.0";
