Updated Branches:
  refs/heads/master 11ba4134e -> f6f965c4d

CRUNCH-112: Clean up FilterFn.

Move implementation classes to a new FilterFns utility class.
Add new ACCEPT_ALL and REJECT_ALL filters.
Rewrite test cases to use the new filters.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/f6f965c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/f6f965c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/f6f965c4

Branch: refs/heads/master
Commit: f6f965c4d9963eacb5584a85b667ed7148bccabb
Parents: 11ba413
Author: Matthias Friedrich <[email protected]>
Authored: Sun Nov 11 21:13:46 2012 +0100
Committer: Matthias Friedrich <[email protected]>
Committed: Sun Nov 11 21:13:46 2012 +0100

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/MRPipelineIT.java    |    8 +-
 .../it/java/org/apache/crunch/MaterializeIT.java   |   13 +--
 .../org/apache/crunch/PCollectionGetSizeIT.java    |   15 +--
 .../org/apache/crunch/lib/join/MapsideJoinIT.java  |   12 +--
 .../src/main/java/org/apache/crunch/FilterFn.java  |   21 +++-
 .../main/java/org/apache/crunch/fn/FilterFns.java  |  112 +++++++++++++++
 .../test/java/org/apache/crunch/FilterFnTest.java  |   63 --------
 .../java/org/apache/crunch/fn/FilterFnTest.java    |   85 +++++++++++
 8 files changed, 226 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java 
b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
index 8664550..7670e88 100644
--- a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.To;
@@ -42,12 +43,7 @@ public class MRPipelineIT implements Serializable {
     Pipeline pipeline = new MRPipeline(MRPipelineIT.class, 
tmpDir.getDefaultConfiguration());
     PCollection<String> genericCollection = 
pipeline.readTextFile(textFile.getAbsolutePath());
     pipeline.run();
-    PCollection<String> filter = genericCollection.filter("Filtering data", 
new FilterFn<String>() {
-      @Override
-      public boolean accept(String input) {
-        return true;
-      }
-    });
+    PCollection<String> filter = genericCollection.filter("Filtering data", 
FilterFns.<String>ACCEPT_ALL());
     filter.materialize();
     pipeline.run();
     File file = tmpDir.getFile("output.txt");

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java 
b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
index 3b4f0e6..d064993 100644
--- a/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MaterializeIT.java
@@ -23,6 +23,7 @@ import static junit.framework.Assert.assertTrue;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.Person;
@@ -41,16 +42,6 @@ import com.google.common.collect.Lists;
 
 public class MaterializeIT {
 
-  /** Filter that rejects everything. */
-  @SuppressWarnings("serial")
-  private static class FalseFilterFn extends FilterFn<String> {
-
-    @Override
-    public boolean accept(final String input) {
-      return false;
-    }
-  }
-
   @Rule
   public TemporaryPath tmpDir = TemporaryPaths.create();
 
@@ -112,7 +103,7 @@ public class MaterializeIT {
   public void runMaterializeEmptyIntermediate(Pipeline pipeline, PTypeFamily 
typeFamily)
       throws IOException {
     String inputPath = tmpDir.copyResourceFileName("set1.txt");
-    PCollection<String> empty = pipeline.readTextFile(inputPath).filter(new 
FalseFilterFn());
+    PCollection<String> empty = 
pipeline.readTextFile(inputPath).filter(FilterFns.<String>REJECT_ALL());
 
     assertTrue(Lists.newArrayList(empty.materialize()).isEmpty());
     pipeline.done();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java 
b/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
index 8182b30..44eb897 100644
--- a/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
+++ b/crunch/src/it/java/org/apache/crunch/PCollectionGetSizeIT.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertThat;
 
 import java.io.IOException;
 
+import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
@@ -43,16 +44,6 @@ public class PCollectionGetSizeIT {
   private String nonEmptyInputPath;
   private String outputPath;
 
-  /** Filter that rejects everything. */
-  @SuppressWarnings("serial")
-  private static class FalseFilterFn extends FilterFn<String> {
-
-    @Override
-    public boolean accept(final String input) {
-      return false;
-    }
-  }
-
   @Before
   public void setUp() throws IOException {
     emptyInputPath = tmpDir.copyResourceFileName("emptyTextFile.txt");
@@ -105,7 +96,7 @@ public class PCollectionGetSizeIT {
     PCollection<String> data = new MRPipeline(this.getClass(), 
tmpDir.getDefaultConfiguration())
       .readTextFile(nonEmptyInputPath);
 
-    PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
+    PCollection<String> emptyPCollection = 
data.filter(FilterFns.<String>REJECT_ALL());
 
     assertThat(emptyPCollection.getSize(), is(0L));
   }
@@ -139,7 +130,7 @@ public class PCollectionGetSizeIT {
 
     PCollection<String> data = pipeline.readTextFile(nonEmptyInputPath);
 
-    PCollection<String> emptyPCollection = data.filter(new FalseFilterFn());
+    PCollection<String> emptyPCollection = 
data.filter(FilterFns.<String>REJECT_ALL());
 
     emptyPCollection.write(sequenceFile(outputPath, strings()));
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java 
b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
index 9982ba4..297680e 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/join/MapsideJoinIT.java
@@ -24,11 +24,11 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.crunch.FilterFn;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
@@ -73,14 +73,6 @@ public class MapsideJoinIT {
 
   }
 
-  private static class NegativeFilter extends FilterFn<Pair<Integer, String>> {
-
-    @Override
-    public boolean accept(Pair<Integer, String> input) {
-      return false;
-    }
-
-  }
   @Rule
   public TemporaryPath tmpDir = TemporaryPaths.create();
 
@@ -96,7 +88,7 @@ public class MapsideJoinIT {
     PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
 
     PTable<Integer, String> filteredOrderTable = orderTable
-        .parallelDo(new NegativeFilter(), orderTable.getPTableType());
+        .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), 
orderTable.getPTableType());
 
     PTable<Integer, Pair<String, String>> joined = 
MapsideJoin.join(customerTable, filteredOrderTable);
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/main/java/org/apache/crunch/FilterFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/FilterFn.java 
b/crunch/src/main/java/org/apache/crunch/FilterFn.java
index a9612ac..467400f 100644
--- a/crunch/src/main/java/org/apache/crunch/FilterFn.java
+++ b/crunch/src/main/java/org/apache/crunch/FilterFn.java
@@ -19,12 +19,13 @@ package org.apache.crunch;
 
 import java.util.List;
 
+import org.apache.crunch.fn.FilterFns;
+
 import com.google.common.collect.ImmutableList;
 
 /**
  * A {@link DoFn} for the common case of filtering the members of a
  * {@link PCollection} based on a boolean condition.
- * 
  */
 public abstract class FilterFn<T> extends DoFn<T, T> {
 
@@ -45,10 +46,16 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
     return 0.5f;
   }
 
+  /**
+   * @deprecated Use {@link FilterFns#and(FilterFn...)}
+   */
   public static <S> FilterFn<S> and(FilterFn<S>... fns) {
     return new AndFn<S>(fns);
   }
 
+  /**
+   * @deprecated Use {@link FilterFns#and(FilterFn...)}
+   */
   public static class AndFn<S> extends FilterFn<S> {
 
     private final List<FilterFn<S>> fns;
@@ -77,10 +84,16 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
     }
   }
 
+  /**
+   * @deprecated Use {@link FilterFns#or(FilterFn...)}
+   */
   public static <S> FilterFn<S> or(FilterFn<S>... fns) {
     return new OrFn<S>(fns);
   }
 
+  /**
+   * @deprecated Use {@link FilterFns#or(FilterFn...)}
+   */
   public static class OrFn<S> extends FilterFn<S> {
 
     private final List<FilterFn<S>> fns;
@@ -109,10 +122,16 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
     }
   }
 
+  /**
+   * @deprecated Use {@link FilterFns#not(FilterFn)}
+   */
   public static <S> FilterFn<S> not(FilterFn<S> fn) {
     return new NotFn<S>(fn);
   }
 
+  /**
+   * @deprecated Use {@link FilterFns#not(FilterFn)}
+   */
   public static class NotFn<S> extends FilterFn<S> {
 
     private final FilterFn<S> base;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java 
b/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java
new file mode 100644
index 0000000..8dc4268
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/fn/FilterFns.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import org.apache.crunch.FilterFn;
+import org.apache.crunch.FilterFn.AndFn;
+import org.apache.crunch.FilterFn.NotFn;
+import org.apache.crunch.FilterFn.OrFn;
+
+
+/**
+ * A collection of pre-defined {@link FilterFn} implementations.
+ */
+public final class FilterFns {
+  // Note: We delegate to the deprecated implementation classes in FilterFn. 
When their
+  //       time is up, we just move them here.
+
+  private FilterFns() {
+    // utility class, not for instantiation
+  }
+
+  /**
+   * Accept an entry if all of the given filters accept it, using 
short-circuit evaluation.
+   * @param fn1 The first functions to delegate to
+   * @param fn2 The second functions to delegate to
+   * @return The composed filter function
+   */
+  public static <S> FilterFn<S> and(FilterFn<S> fn1, FilterFn<S> fn2) {
+    return new AndFn<S>(fn1, fn2);
+  }
+
+  /**
+   * Accept an entry if all of the given filters accept it, using 
short-circuit evaluation.
+   * @param fns The functions to delegate to (in the given order)
+   * @return The composed filter function
+   */
+  public static <S> FilterFn<S> and(FilterFn<S>... fns) {
+    return new AndFn<S>(fns);
+  }
+
+  /**
+   * Accept an entry if at least one of the given filters accept it, using 
short-circuit evaluation.
+   * @param fn1 The first functions to delegate to
+   * @param fn2 The second functions to delegate to
+   * @return The composed filter function
+   */
+  public static <S> FilterFn<S> or(FilterFn<S> fn1, FilterFn<S> fn2) {
+    return new OrFn<S>(fn1, fn2);
+  }
+
+  /**
+   * Accept an entry if at least one of the given filters accept it, using 
short-circuit evaluation.
+   * @param fns The functions to delegate to (in the given order)
+   * @return The composed filter function
+   */
+  public static <S> FilterFn<S> or(FilterFn<S>... fns) {
+    return new OrFn<S>(fns);
+  }
+
+  /**
+   * Accept an entry if the given filter <em>does not</em> accept it.
+   * @param fn The function to delegate to
+   * @return The composed filter function
+   */
+  public static <S> FilterFn<S> not(FilterFn<S> fn) {
+    return new NotFn<S>(fn);
+  }
+
+  /**
+   * Accept everything.
+   * @return A filter function that accepts everything.
+   */
+  public static <S> FilterFn<S> ACCEPT_ALL() {
+    return new AcceptAllFn<S>();
+  }
+
+  /**
+   * Reject everything.
+   * @return A filter function that rejects everything.
+   */
+  public static <S> FilterFn<S> REJECT_ALL() {
+    return not(new AcceptAllFn<S>());
+  }
+
+  private static class AcceptAllFn<S> extends FilterFn<S> {
+    @Override
+    public boolean accept(S input) {
+      return true;
+    }
+
+    @Override
+    public float scaleFactor() {
+      return 1.0f;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/test/java/org/apache/crunch/FilterFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/FilterFnTest.java 
b/crunch/src/test/java/org/apache/crunch/FilterFnTest.java
deleted file mode 100644
index 9de086d..0000000
--- a/crunch/src/test/java/org/apache/crunch/FilterFnTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import org.junit.Test;
-
-@SuppressWarnings("serial")
-public class FilterFnTest {
-
-  private static final FilterFn<String> TRUE = new FilterFn<String>() {
-    @Override
-    public boolean accept(String input) {
-      return true;
-    }
-  };
-
-  private static final FilterFn<String> FALSE = new FilterFn<String>() {
-    @Override
-    public boolean accept(String input) {
-      return false;
-    }
-  };
-
-  @Test
-  public void testAnd() {
-    assertTrue(FilterFn.and(TRUE).accept("foo"));
-    assertTrue(FilterFn.and(TRUE, TRUE).accept("foo"));
-    assertFalse(FilterFn.and(TRUE, FALSE).accept("foo"));
-    assertFalse(FilterFn.and(FALSE, FALSE, FALSE).accept("foo"));
-  }
-
-  @Test
-  public void testOr() {
-    assertFalse(FilterFn.or(FALSE).accept("foo"));
-    assertTrue(FilterFn.or(FALSE, TRUE).accept("foo"));
-    assertTrue(FilterFn.or(TRUE, FALSE, TRUE).accept("foo"));
-    assertFalse(FilterFn.or(FALSE, FALSE, FALSE).accept("foo"));
-  }
-
-  @Test
-  public void testNot() {
-    assertFalse(FilterFn.not(TRUE).accept("foo"));
-    assertTrue(FilterFn.not(FALSE).accept("foo"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f6f965c4/crunch/src/test/java/org/apache/crunch/fn/FilterFnTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/fn/FilterFnTest.java 
b/crunch/src/test/java/org/apache/crunch/fn/FilterFnTest.java
new file mode 100644
index 0000000..a649f99
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/fn/FilterFnTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.fn;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.crunch.FilterFn;
+import org.junit.Test;
+
+import com.google.common.base.Predicates;
+
+
+public class FilterFnTest {
+
+  private static final FilterFn<String> TRUE = FilterFns.<String>ACCEPT_ALL();
+  private static final FilterFn<String> FALSE = FilterFns.<String>REJECT_ALL();
+
+  @Test
+  public void testAcceptAll() {
+    assertThat(TRUE.accept(""), is(true));
+    assertThat(TRUE.accept("foo"), is(true));
+  }
+
+  @Test
+  public void testRejectAll() {
+    assertThat(FALSE.accept(""), is(false));
+    assertThat(FALSE.accept("foo"), is(false));
+
+    Predicates.or(Predicates.alwaysFalse(), Predicates.alwaysTrue());
+  }
+
+  @Test
+  public void testAnd() {
+    assertThat(FilterFns.and(TRUE, TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.and(TRUE, FALSE).accept("foo"), is(false));
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGeneric() {
+    assertThat(FilterFns.and(TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.and(FALSE).accept("foo"), is(false));
+    assertThat(FilterFns.and(FALSE, FALSE, FALSE).accept("foo"), is(false));
+    assertThat(FilterFns.and(TRUE, TRUE, FALSE).accept("foo"), is(false));
+    assertThat(FilterFns.and(FALSE, FALSE, FALSE, FALSE).accept("foo"), 
is(false));
+  }
+
+  @Test
+  public void testOr() {
+    assertThat(FilterFns.or(FALSE, TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.or(TRUE, FALSE).accept("foo"), is(true));
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testOrGeneric() {
+    assertThat(FilterFns.or(TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.or(FALSE).accept("foo"), is(false));
+    assertThat(FilterFns.or(TRUE, FALSE, TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.or(FALSE, FALSE, TRUE).accept("foo"), is(true));
+    assertThat(FilterFns.or(FALSE, FALSE, FALSE).accept("foo"), is(false));
+  }
+
+  @Test
+  public void testNot() {
+    assertThat(FilterFns.not(TRUE).accept("foo"), is(false));
+    assertThat(FilterFns.not(FALSE).accept("foo"), is(true));
+  }
+}

Reply via email to