Repository: apex-malhar Updated Branches: refs/heads/master 390b41c43 -> c0af266ae (forced update)
APEXMALHAR-2195 Fix LineReaderContext last record 1. Fixing ReaderContext Issue 2. Changes in the test app 3. Incorporating review comments 4. Graceful handling of test termination Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c0af266a Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c0af266a Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c0af266a Branch: refs/heads/master Commit: c0af266aeec4f1a7bfbb0fc4bc5516d515ea3650 Parents: 2b2d5bc Author: yogidevendra <[email protected]> Authored: Sun Aug 14 23:15:30 2016 +0530 Committer: yogidevendra <[email protected]> Committed: Fri Aug 26 12:36:04 2016 +0530 ---------------------------------------------------------------------- .../datatorrent/lib/io/block/ReaderContext.java | 16 +++++++++------ .../apex/malhar/lib/fs/FSRecordReaderTest.java | 21 ++++++++++---------- 2 files changed, 21 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c0af266a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java index 1ee6c53..6fe47a2 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java @@ -152,7 +152,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> private final transient ByteArrayOutputStream tmpBuilder; private transient byte[] buffer; - private transient String strBuffer; + private transient String bufferStr; private transient int posInStr; public LineReaderContext() @@ -190,11 +190,11 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> if (bytesRead == -1) { break; } - strBuffer = new String(buffer); + bufferStr = new String(buffer,0, bytesRead); } - while (posInStr < strBuffer.length()) { - char c = strBuffer.charAt(posInStr); + while (posInStr < bufferStr.length()) { + char c = bufferStr.charAt(posInStr); if (c != '\r' && c != '\n') { tmpBuilder.write(c); posInStr++; @@ -208,8 +208,8 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> lineBuilder.write(subLine); if (foundEOL) { - while (posInStr < strBuffer.length()) { - char c = strBuffer.charAt(posInStr); + while (posInStr < bufferStr.length()) { + char c = bufferStr.charAt(posInStr); if (c == '\r' || c == '\n') { emptyBuilder.write(c); posInStr++; @@ -219,6 +219,10 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> } usedBytes += emptyBuilder.toByteArray().length; } else { + //end of stream reached + if (bytesRead < bufferSize) { + break; + } //read more bytes from the input stream posInStr = 0; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c0af266a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java index fdd888c..ecaff70 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/FSRecordReaderTest.java @@ -90,14 +90,22 @@ public class FSRecordReaderTest LocalMode.Controller lc = lma.getController(); lc.setHeartbeatMonitoringEnabled(true); lc.runAsync(); - LOG.debug("Waiting for app to finish"); - Thread.sleep(1000 * 1); + + Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n"))); + expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n"))); + + while (DelimitedValidator.records.size() != expectedRecords.size()) { + LOG.debug("Waiting for app to finish"); + Thread.sleep(1000); + } lc.shutdown(); + Assert.assertEquals(expectedRecords, DelimitedValidator.records); + } public static class DelimitedValidator extends BaseOperator { - Set<String> records = new HashSet<String>(); + static Set<String> records = new HashSet<String>(); public final transient DefaultInputPort<byte[]> data = new DefaultInputPort<byte[]>() { @@ -110,13 +118,6 @@ public class FSRecordReaderTest } }; - public void teardown() - { - Set<String> expectedRecords = new HashSet<String>(Arrays.asList(FILE_1_DATA.split("\n"))); - expectedRecords.addAll(Arrays.asList(FILE_2_DATA.split("\n"))); - - Assert.assertEquals(expectedRecords, records); - } } private static class DelimitedApplication implements StreamingApplication
