Updated Branches:
  refs/heads/master 29de385be -> dd3ae01ae

CRUNCH-135: Remove sample and sort from PCollection interface


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/dd3ae01a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/dd3ae01a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/dd3ae01a

Branch: refs/heads/master
Commit: dd3ae01ae88707dc5490a2e8294fb6c31ef11e25
Parents: 29de385
Author: Josh Wills <[email protected]>
Authored: Tue Dec 18 11:14:59 2012 -0800
Committer: Josh Wills <[email protected]>
Committed: Tue Dec 18 11:45:05 2012 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/scrunch/PCollection.scala    |    6 ++--
 .../src/it/java/org/apache/crunch/lib/SortIT.java  |    8 ++++--
 .../main/java/org/apache/crunch/PCollection.java   |   18 ---------------
 crunch/src/main/java/org/apache/crunch/PTable.java |   17 ++++++++++++++
 .../crunch/impl/mem/collect/MemCollection.java     |   17 --------------
 .../apache/crunch/impl/mem/collect/MemTable.java   |   11 +++++++++
 .../crunch/impl/mr/collect/PCollectionImpl.java    |   17 --------------
 .../apache/crunch/impl/mr/collect/PTableBase.java  |   11 +++++++++
 .../java/org/apache/crunch/lib/SampleTest.java     |    5 ++-
 9 files changed, 50 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git 
a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala 
b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index ac2242f..49ee6c0 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConversions
 
 import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
 import org.apache.crunch.{PCollection => JCollection, Pair => CPair}
-import org.apache.crunch.lib.{Aggregate, Cartesian}
+import org.apache.crunch.lib.{Aggregate, Cartesian, Sample}
 import org.apache.crunch.scrunch.Conversions._
 import org.apache.crunch.scrunch.interpreter.InterpreterRunner
 
@@ -77,11 +77,11 @@ class PCollection[S](val native: JCollection[S]) extends 
PCollectionLike[S, PCol
   def min()(implicit converter: Converter[S, S]) = 
PObject(Aggregate.min(native))(converter)
 
   def sample(acceptanceProbability: Double) = {
-    wrap(native.sample(acceptanceProbability))
+    wrap(Sample.sample(native, acceptanceProbability))
   }
 
   def sample(acceptanceProbability: Double, seed: Long) = {
-    wrap(native.sample(acceptanceProbability, seed))
+    wrap(Sample.sample(native, seed, acceptanceProbability))
   }
 
   def pType = native.getPType()

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java 
b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
index 4a22a51..3ea31ca 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/SortIT.java
@@ -151,15 +151,17 @@ public class SortIT implements Serializable {
   public void testAvroReflectSortPair() throws IOException {
     Pipeline pipeline = new MRPipeline(SortIT.class, 
tmpDir.getDefaultConfiguration());
     pipeline.enableDebug();
-    PCollection<Pair<String, StringWrapper>> sorted = 
pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
+    String rsrc = tmpDir.copyResourceFileName("set2.txt");
+    PCollection<Pair<String, StringWrapper>> in = pipeline.readTextFile(rsrc)
         .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
 
           @Override
           public Pair<String, StringWrapper> map(String input) {
             return Pair.of(input, wrap(input));
           }
-        }, Avros.pairs(Avros.strings(), 
Avros.reflects(StringWrapper.class))).sort(true);
-
+        }, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class)));
+    PCollection<Pair<String, StringWrapper>> sorted = Sort.sort(in, 
Order.ASCENDING);
+    
     List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
     expected.add(Pair.of("a", wrap("a")));
     expected.add(Pair.of("c", wrap("c")));

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java 
b/crunch/src/main/java/org/apache/crunch/PCollection.java
index f5a3465..00c300f 100644
--- a/crunch/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/PCollection.java
@@ -176,12 +176,6 @@ public interface PCollection<S> {
   <K> PTable<K, S> by(String name, MapFn<S, K> extractKeyFn, PType<K> keyType);
 
   /**
-   * Returns a {@code PCollection} instance that contains all of the elements 
of
-   * this instance in sorted order.
-   */
-  PCollection<S> sort(boolean ascending);
-
-  /**
    * Returns a {@code PTable} instance that contains the counts of each unique
    * element of this PCollection.
    */
@@ -196,16 +190,4 @@ public interface PCollection<S> {
    * Returns a {@code PObject} of the minimum element of this instance.
    */
   PObject<S> min();
-
-  /**
-   * Randomly sample items from this PCollection instance with the given
-   * probability of an item being accepted.
-   */
-  PCollection<S> sample(double acceptanceProbability);
-
-  /**
-   * Randomly sample items from this PCollection instance with the given
-   * probability of an item being accepted and using the given seed.
-   */
-  PCollection<S> sample(double acceptanceProbability, long seed);
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PTable.java 
b/crunch/src/main/java/org/apache/crunch/PTable.java
index e827603..b754a2c 100644
--- a/crunch/src/main/java/org/apache/crunch/PTable.java
+++ b/crunch/src/main/java/org/apache/crunch/PTable.java
@@ -89,6 +89,23 @@ public interface PTable<K, V> extends PCollection<Pair<K, 
V>> {
   PTable<K, Collection<V>> collectValues();
 
   /**
+   * Apply the given filter function to this instance and return the resulting
+   * {@code PTable}.
+   */
+  PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn);
+  
+  /**
+   * Apply the given filter function to this instance and return the resulting
+   * {@code PTable}.
+   *
+   * @param name
+   *          An identifier for this processing step
+   * @param filterFn
+   *          The {@code FilterFn} to apply
+   */
+  PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn);
+  
+  /**
    * Returns a PTable made up of the pairs in this PTable with the largest 
value
    * field.
    *

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java 
b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index 61bb1e7..35f64ce 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -37,8 +37,6 @@ import org.apache.crunch.fn.ExtractKeyFn;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
 import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.lib.Sample;
-import org.apache.crunch.lib.Sort;
 import org.apache.crunch.materialize.pobject.CollectionPObject;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
@@ -183,16 +181,6 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
-  public PCollection<S> sample(double acceptanceProbability) {
-    return Sample.sample(this, acceptanceProbability);
-  }
-
-  @Override
-  public PCollection<S> sample(double acceptanceProbability, long seed) {
-    return Sample.sample(this, seed, acceptanceProbability);
-  }
-
-  @Override
   public PObject<S> max() {
     return Aggregate.max(this);
   }
@@ -203,11 +191,6 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
-  public PCollection<S> sort(boolean ascending) {
-    return Sort.sort(this, ascending ? Sort.Order.ASCENDING : 
Sort.Order.DESCENDING);
-  }
-
-  @Override
   public PCollection<S> filter(FilterFn<S> filterFn) {
     return parallelDo(filterFn, getPType());
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java 
b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index 56dc69a..524d492 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PGroupedTable;
@@ -107,6 +108,16 @@ public class MemTable<K, V> extends MemCollection<Pair<K, 
V>> implements PTable<
   }
 
   @Override
+  public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(filterFn, getPTableType());
+  }
+  
+  @Override
+  public PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(name, filterFn, getPTableType());
+  }
+
+  @Override
   public PTable<K, V> top(int count) {
     return Aggregate.top(this, count, true);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java 
b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index f0d8187..79fe72b 100644
--- 
a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ 
b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -37,8 +37,6 @@ import org.apache.crunch.fn.ExtractKeyFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.plan.DoNode;
 import org.apache.crunch.lib.Aggregate;
-import org.apache.crunch.lib.Sample;
-import org.apache.crunch.lib.Sort;
 import org.apache.crunch.materialize.pobject.CollectionPObject;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
@@ -153,11 +151,6 @@ public abstract class PCollectionImpl<S> implements 
PCollection<S> {
   }
 
   @Override
-  public PCollection<S> sort(boolean ascending) {
-    return Sort.sort(this, ascending ? Sort.Order.ASCENDING : 
Sort.Order.DESCENDING);
-  }
-
-  @Override
   public PTable<S, Long> count() {
     return Aggregate.count(this);
   }
@@ -178,16 +171,6 @@ public abstract class PCollectionImpl<S> implements 
PCollection<S> {
   }
 
   @Override
-  public PCollection<S> sample(double acceptanceProbability) {
-    return Sample.sample(this, acceptanceProbability);
-  }
-
-  @Override
-  public PCollection<S> sample(double acceptanceProbability, long seed) {
-    return Sample.sample(this, seed, acceptanceProbability);
-  }
-
-  @Override
   public PTypeFamily getTypeFamily() {
     return getPType().getFamily();
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java 
b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
index 9183784..03c2fdc 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
@@ -80,6 +81,16 @@ abstract class PTableBase<K, V> extends 
PCollectionImpl<Pair<K, V>> implements P
   }
 
   @Override
+  public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(filterFn, getPTableType());
+  }
+  
+  @Override
+  public PTable<K, V> filter(String name, FilterFn<Pair<K, V>> filterFn) {
+    return parallelDo(name, filterFn, getPTableType());
+  }
+  
+  @Override
   public PTable<K, V> top(int count) {
     return Aggregate.top(this, count, true);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/dd3ae01a/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java 
b/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java
index 0f75fb6..69fd074 100644
--- a/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java
+++ b/crunch/src/test/java/org/apache/crunch/lib/SampleTest.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.List;
 
+import org.apache.crunch.PCollection;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.junit.Test;
 
@@ -29,8 +30,8 @@ import com.google.common.collect.ImmutableList;
 public class SampleTest {
   @Test
   public void testSampler() {
-    Iterable<Integer> sample = MemPipeline.collectionOf(1, 2, 3, 4, 5, 6, 7, 
8, 9, 10).sample(0.2, 123998)
-        .materialize();
+    PCollection<Integer> pcollect = MemPipeline.collectionOf(1, 2, 3, 4, 5, 6, 
7, 8, 9, 10);
+    Iterable<Integer> sample = Sample.sample(pcollect, 123998, 
0.2).materialize();
     List<Integer> sampleValues = ImmutableList.copyOf(sample);
     assertEquals(ImmutableList.of(6, 7), sampleValues);
   }

Reply via email to