Repository: apex-malhar Updated Branches: refs/heads/master 882e4568a -> 451250818
APEXMALHAR-2119 add setters for partition count and index. Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9c45be55 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9c45be55 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9c45be55 Branch: refs/heads/master Commit: 9c45be55050b711a22b412a0417caf99862985a9 Parents: 3e7b76b Author: Tushar R. Gosavi <[email protected]> Authored: Wed Jun 15 17:12:16 2016 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Tue Jun 21 10:50:26 2016 +0530 ---------------------------------------------------------------------- .../lib/io/fs/AbstractFileInputOperator.java | 20 +++++- .../io/fs/AbstractFileInputOperatorTest.java | 71 ++++++++++++++++++++ 2 files changed, 89 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c45be55/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index dae5c7f..a3de5f3 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -1056,10 +1056,15 @@ public abstract class AbstractFileInputOperator<T> return pathSet; } + protected int getPartition(String filePathStr) + { + return filePathStr.hashCode(); + } + protected boolean acceptFile(String filePathStr) { if (partitionCount > 1) { - int i = filePathStr.hashCode(); + int i = getPartition(filePathStr); int mod = i % partitionCount; if (mod < 0) { mod += partitionCount; @@ -1096,7 +1101,8 @@ public abstract class AbstractFileInputOperator<T> protected DirectoryScanner createPartition(int partitionIndex, int partitionCount) { - DirectoryScanner that = new DirectoryScanner(); + KryoCloneUtils<DirectoryScanner> cloneUtils = KryoCloneUtils.createCloneUtils(this); + DirectoryScanner that = cloneUtils.getClone(); that.filePatternRegexp = this.filePatternRegexp; that.regex = this.regex; that.partitionIndex = partitionIndex; @@ -1110,6 +1116,16 @@ public abstract class AbstractFileInputOperator<T> return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + " partitionIndex=" + partitionIndex + " partitionCount=" + partitionCount + "]"; } + + protected void setPartitionIndex(int partitionIndex) + { + this.partitionIndex = partitionIndex; + } + + protected void setPartitionCount(int partitionCount) + { + this.partitionCount = partitionCount; + } } protected static class RecoveryEntry http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c45be55/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index aa295b1..7990049 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import org.junit.Assert; @@ -867,4 +868,74 @@ public class AbstractFileInputOperatorTest return null; } } + + /** scanner to extract partition id from start of the filename */ + static class MyScanner extends AbstractFileInputOperator.DirectoryScanner + { + @Override + protected int getPartition(String filePathStr) + { + String[] parts = filePathStr.split("/"); + parts = parts[parts.length - 1].split("_"); + try { + int code = Integer.parseInt(parts[0]); + return code; + } catch (NumberFormatException ex) { + return super.getPartition(filePathStr); + } + } + } + + /** + * Partition the operator in 2 + * create ten files with index of the file at the start, i.e 1_file, 2_file .. etc. + * The scanner returns this index from getPartition method. + * each partition should read 5 files as file index are from 0 to 9 (including 0 and 9). + * @throws Exception + */ + @Test + public void testWithCustomScanner() throws Exception + { + LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); + oper.setScanner(new MyScanner()); + oper.getScanner().setFilePatternRegexp(".*partition_([\\d]*)"); + oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); + + Random rand = new Random(); + Path path = new Path(new File(testMeta.dir).getAbsolutePath()); + FileContext.getLocalFSFileContext().delete(path, true); + for (int file = 0; file < 10; file++) { + FileUtils.write(new File(testMeta.dir, file + "_partition_00" + rand.nextInt(100)), ""); + } + + List<Partition<AbstractFileInputOperator<String>>> partitions = Lists.newArrayList(); + partitions.add(new DefaultPartition<AbstractFileInputOperator<String>>(oper)); + Collection<Partition<AbstractFileInputOperator<String>>> newPartitions = oper.definePartitions(partitions, + new PartitioningContextImpl(null, 2)); + Assert.assertEquals(2, newPartitions.size()); + Assert.assertEquals(1, oper.getCurrentPartitions()); // partitioned() wasn't called + + for (Partition<AbstractFileInputOperator<String>> p : newPartitions) { + Assert.assertNotSame(oper, p.getPartitionedInstance()); + Assert.assertNotSame(oper.getScanner(), p.getPartitionedInstance().getScanner()); + Set<String> consumed = Sets.newHashSet(); + LinkedHashSet<Path> files = p.getPartitionedInstance().getScanner().scan(FileSystem.getLocal(new Configuration(false)), path, consumed); + Assert.assertEquals("partition " + files, 5, files.size()); + } + } + + @Test + public void testCustomScanner() + { + MyScanner scanner = new MyScanner(); + scanner.setPartitionCount(2); + + scanner.setPartitionIndex(1); + boolean accepted = scanner.acceptFile("1_file"); + Assert.assertTrue("File should be accepted by this partition ", accepted); + + scanner.setPartitionIndex(0); + accepted = scanner.acceptFile("1_file"); + Assert.assertFalse("File should not be accepted by this partition ", accepted); + } }
