Repository: apex-malhar Updated Branches: refs/heads/master 9eadce12c -> 2153cd6ba
APEXMALHAR-2272 : Fixed sequentialFileRead on FSInputModule 1. Fixed StreamCodec to route all blocks for a file to same partition. 2. Fixed spelling for sequentialFileRead in the code, javadocs. 3.marked FSInputModule as evolving 4.added japicmp exclusion for FSInputModule Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2153cd6b Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2153cd6b Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2153cd6b Branch: refs/heads/master Commit: 2153cd6babadf11e7be63a137daa7dd2c3ef7230 Parents: 9eadce1 Author: yogidevendra <[email protected]> Authored: Sat Oct 1 08:19:56 2016 +0530 Committer: Priyanka Gugale <[email protected]> Committed: Mon Oct 17 15:35:32 2016 +0530 ---------------------------------------------------------------------- .../datatorrent/lib/io/fs/FSInputModule.java | 26 ++++----- .../datatorrent/lib/io/fs/S3InputModule.java | 2 +- .../lib/io/fs/FSInputModuleTest.java | 57 ++++++++++++++++++++ pom.xml | 1 + 4 files changed, 72 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java index d276dd7..d71111c 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FSInputModule.java @@ -42,12 +42,12 @@ import com.datatorrent.netlet.util.Slice; * 4. recursive: if scan recursively input directories<br/> * 5. blockSize: block size used to read input blocks of file<br/> * 6. readersCount: count of readers to read input file<br/> - * 7. sequencialFileRead: If emit file blocks in sequence?<br/> + * 7. sequentialFileRead: If emit file blocks in sequence?<br/> * 8. blocksThreshold: number of blocks emitted per window * * @since 3.5.0 */ - [email protected] public class FSInputModule implements Module { @NotNull @@ -58,7 +58,7 @@ public class FSInputModule implements Module private long scanIntervalMillis; private boolean recursive = true; private long blockSize; - private boolean sequencialFileRead = false; + private boolean sequentialFileRead = false; private int readersCount; @Min(1) protected int blocksThreshold; @@ -89,7 +89,7 @@ public class FSInputModule implements Module blocksMetadataOutput.set(blockReader.blocksMetadataOutput); messages.set(blockReader.messages); - if (sequencialFileRead) { + if (sequentialFileRead) { dag.setInputPortAttribute(blockReader.blocksMetadataInput, Context.PortContext.STREAM_CODEC, new SequentialFileBlockMetadataCodec()); } @@ -236,23 +236,23 @@ public class FSInputModule implements Module } /** - * Gets is sequencial file read + * Gets is sequential file read * - * @return sequencialFileRead + * @return sequentialFileRead */ - public boolean isSequencialFileRead() + public boolean isSequentialFileRead() { - return sequencialFileRead; + return sequentialFileRead; } /** - * Sets is sequencial file read + * Sets is sequential file read * - * @param sequencialFileRead + * @param sequentialFileRead */ - public void setSequencialFileRead(boolean sequencialFileRead) + public void setSequentialFileRead(boolean sequentialFileRead) { - this.sequencialFileRead = sequencialFileRead; + this.sequentialFileRead = sequentialFileRead; } /** @@ -283,7 +283,7 @@ public class FSInputModule implements Module @Override public int getPartition(BlockMetadata.FileBlockMetadata fileBlockMetadata) { - return fileBlockMetadata.hashCode(); + return fileBlockMetadata.getFilePath().hashCode(); } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java index bd55161..366eaa5 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/S3InputModule.java @@ -38,7 +38,7 @@ import com.datatorrent.lib.io.block.FSSliceReader; * 4. recursive: if scan recursively input directories<br/> * 5. blockSize: block size used to read input blocks of file<br/> * 6. readersCount: count of readers to read input file<br/> - * 7. sequencialFileRead: Is emit file blocks in sequence? + * 7. sequentialFileRead: Is emit file blocks in sequence? * * @since 3.5.0 */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java new file mode 100644 index 0000000..12301f6 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package com.datatorrent.lib.io.fs; + +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.fs.FSInputModule.SequentialFileBlockMetadataCodec; + +import static org.mockito.Mockito.when; + +public class FSInputModuleTest +{ + @Mock + BlockMetadata.FileBlockMetadata file1block1; + @Mock + BlockMetadata.FileBlockMetadata file1block2; + @Mock + BlockMetadata.FileBlockMetadata file2block1; + + @Test + public void testSequentialFileBlockMetadataCodec() + { + + MockitoAnnotations.initMocks(this); + when(file1block1.getFilePath()).thenReturn("file1"); + when(file1block2.getFilePath()).thenReturn("file1"); + when(file2block1.getFilePath()).thenReturn("file2"); + + SequentialFileBlockMetadataCodec codec = new SequentialFileBlockMetadataCodec(); + + Assert.assertEquals("Blocks of same file distributed to different partitions", codec.getPartition(file1block1), + codec.getPartition(file1block2)); + Assert.assertNotSame("Blocks of different files distributed to same partition", codec.getPartition(file1block1), + codec.getPartition(file2block1)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2153cd6b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f9ce982..cb8bd93 100644 --- a/pom.xml +++ b/pom.xml @@ -134,6 +134,7 @@ <excludes> <exclude>@org.apache.hadoop.classification.InterfaceStability$Evolving</exclude> <exclude>@org.apache.hadoop.classification.InterfaceStability$Unstable</exclude> + <exclude>com.datatorrent.lib.io.fs.FSInputModule</exclude> </excludes> </parameter> <skip>${semver.plugin.skip}</skip>
