Repository: apex-malhar
Updated Branches:
  refs/heads/master 882e4568a -> 451250818


APEXMALHAR-2119 add setters for partition count and index.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9c45be55
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9c45be55
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9c45be55

Branch: refs/heads/master
Commit: 9c45be55050b711a22b412a0417caf99862985a9
Parents: 3e7b76b
Author: Tushar R. Gosavi <[email protected]>
Authored: Wed Jun 15 17:12:16 2016 +0530
Committer: Tushar R. Gosavi <[email protected]>
Committed: Tue Jun 21 10:50:26 2016 +0530

----------------------------------------------------------------------
 .../lib/io/fs/AbstractFileInputOperator.java    | 20 +++++-
 .../io/fs/AbstractFileInputOperatorTest.java    | 71 ++++++++++++++++++++
 2 files changed, 89 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c45be55/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index dae5c7f..a3de5f3 100644
--- 
a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -1056,10 +1056,15 @@ public abstract class AbstractFileInputOperator<T>
       return pathSet;
     }
 
+    protected int getPartition(String filePathStr)
+    {
+      return filePathStr.hashCode();
+    }
+
     protected boolean acceptFile(String filePathStr)
     {
       if (partitionCount > 1) {
-        int i = filePathStr.hashCode();
+        int i = getPartition(filePathStr);
         int mod = i % partitionCount;
         if (mod < 0) {
           mod += partitionCount;
@@ -1096,7 +1101,8 @@ public abstract class AbstractFileInputOperator<T>
 
     protected DirectoryScanner createPartition(int partitionIndex, int 
partitionCount)
     {
-      DirectoryScanner that = new DirectoryScanner();
+      KryoCloneUtils<DirectoryScanner> cloneUtils = 
KryoCloneUtils.createCloneUtils(this);
+      DirectoryScanner that = cloneUtils.getClone();
       that.filePatternRegexp = this.filePatternRegexp;
       that.regex = this.regex;
       that.partitionIndex = partitionIndex;
@@ -1110,6 +1116,16 @@ public abstract class AbstractFileInputOperator<T>
       return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + " 
partitionIndex=" +
           partitionIndex + " partitionCount=" + partitionCount + "]";
     }
+
+    protected void setPartitionIndex(int partitionIndex)
+    {
+      this.partitionIndex = partitionIndex;
+    }
+
+    protected void setPartitionCount(int partitionCount)
+    {
+      this.partitionCount = partitionCount;
+    }
   }
 
   protected static class RecoveryEntry

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c45be55/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
index aa295b1..7990049 100644
--- 
a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
+++ 
b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 
 import org.junit.Assert;
@@ -867,4 +868,74 @@ public class AbstractFileInputOperatorTest
       return null;
     }
   }
+
+  /** scanner to extract partition id from start of the filename */
+  static class MyScanner extends AbstractFileInputOperator.DirectoryScanner
+  {
+    @Override
+    protected int getPartition(String filePathStr)
+    {
+      String[] parts = filePathStr.split("/");
+      parts = parts[parts.length - 1].split("_");
+      try {
+        int code = Integer.parseInt(parts[0]);
+        return code;
+      } catch (NumberFormatException ex) {
+        return super.getPartition(filePathStr);
+      }
+    }
+  }
+
+  /**
+   * Partition the operator in 2
+   * create ten files with index of the file at the start, i.e 1_file, 2_file 
.. etc.
+   * The scanner returns this index from getPartition method.
+   * each partition should read 5 files as file index are from 0 to 9 
(including 0 and 9).
+   * @throws Exception
+   */
+  @Test
+  public void testWithCustomScanner() throws Exception
+  {
+    LineByLineFileInputOperator oper = new LineByLineFileInputOperator();
+    oper.setScanner(new MyScanner());
+    oper.getScanner().setFilePatternRegexp(".*partition_([\\d]*)");
+    oper.setDirectory(new File(testMeta.dir).getAbsolutePath());
+
+    Random rand = new Random();
+    Path path = new Path(new File(testMeta.dir).getAbsolutePath());
+    FileContext.getLocalFSFileContext().delete(path, true);
+    for (int file = 0; file < 10; file++) {
+      FileUtils.write(new File(testMeta.dir, file + "_partition_00" + 
rand.nextInt(100)), "");
+    }
+
+    List<Partition<AbstractFileInputOperator<String>>> partitions = 
Lists.newArrayList();
+    partitions.add(new 
DefaultPartition<AbstractFileInputOperator<String>>(oper));
+    Collection<Partition<AbstractFileInputOperator<String>>> newPartitions = 
oper.definePartitions(partitions,
+        new PartitioningContextImpl(null, 2));
+    Assert.assertEquals(2, newPartitions.size());
+    Assert.assertEquals(1, oper.getCurrentPartitions()); // partitioned() 
wasn't called
+
+    for (Partition<AbstractFileInputOperator<String>> p : newPartitions) {
+      Assert.assertNotSame(oper, p.getPartitionedInstance());
+      Assert.assertNotSame(oper.getScanner(), 
p.getPartitionedInstance().getScanner());
+      Set<String> consumed = Sets.newHashSet();
+      LinkedHashSet<Path> files = 
p.getPartitionedInstance().getScanner().scan(FileSystem.getLocal(new 
Configuration(false)), path, consumed);
+      Assert.assertEquals("partition " + files, 5, files.size());
+    }
+  }
+
+  @Test
+  public void testCustomScanner()
+  {
+    MyScanner scanner = new MyScanner();
+    scanner.setPartitionCount(2);
+
+    scanner.setPartitionIndex(1);
+    boolean accepted = scanner.acceptFile("1_file");
+    Assert.assertTrue("File should be accepted by this partition ", accepted);
+
+    scanner.setPartitionIndex(0);
+    accepted = scanner.acceptFile("1_file");
+    Assert.assertFalse("File should not be accepted by this partition ", 
accepted);
+  }
 }

Reply via email to